From da309f21da880572653b39b7ea1e891cc00e9581 Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Tue, 29 Dec 2020 19:23:01 +0000 Subject: [PATCH] cephadm: introduce context, drop global variables Signed-off-by: Joao Eduardo Luis --- src/cephadm/cephadm | 1549 +++++++++++++++++++++++-------------------- 1 file changed, 845 insertions(+), 704 deletions(-) diff --git a/src/cephadm/cephadm b/src/cephadm/cephadm index f5afbdc9a3618..410ae268df6f9 100755 --- a/src/cephadm/cephadm +++ b/src/cephadm/cephadm @@ -95,11 +95,33 @@ else: if sys.version_info > (3, 0): unicode = str -container_path = '' cached_stdin = None DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ' + +logger: logging.Logger = None # type: ignore + +################################## + +class CephadmContext: + + def __init__(self): + self._args: argparse.Namespace = None # type: ignore + self.container_path: str = None # type: ignore + + @property + def args(self) -> argparse.Namespace: + return self._args + + @args.setter + def args(self, args: argparse.Namespace) -> None: + self._args = args + + +################################## + + # Log and console output config logging_config = { 'version': 1, @@ -242,11 +264,13 @@ class NFSGanesha(object): } def __init__(self, + ctx, fsid, daemon_id, config_json, image=DEFAULT_IMAGE): - # type: (str, Union[int, str], Dict, str) -> None + # type: (CephadmContext, str, Union[int, str], Dict, str) -> None + self.ctx = ctx self.fsid = fsid self.daemon_id = daemon_id self.image = image @@ -263,9 +287,10 @@ class NFSGanesha(object): self.validate() @classmethod - def init(cls, fsid, daemon_id): - # type: (str, Union[int, str]) -> NFSGanesha - return cls(fsid, daemon_id, get_parm(args.config_json), args.image) + def init(cls, ctx, fsid, daemon_id): + # type: (CephadmContext, str, Union[int, str]) -> NFSGanesha + return cls(ctx, fsid, daemon_id, get_parm(ctx.args.config_json), + ctx.args.image) def get_container_mounts(self, data_dir): # type: (str) -> Dict[str, str] @@ -289,11 +314,11 @@ class NFSGanesha(object): return envs @staticmethod - def get_version(container_id): - # type: (str) -> Optional[str] + def get_version(ctx, container_id): + # type: (CephadmContext, str) -> Optional[str] version = None - out, err, code = call( - [container_path, 'exec', container_id, + out, err, code = call(ctx, + [ctx.container_path, 'exec', container_id, NFSGanesha.entrypoint, '-v']) if code == 0: match = re.search(r'NFS-Ganesha Release\s*=\s*[V]*([\d.]+)', out) @@ -374,12 +399,14 @@ class NFSGanesha(object): args += ['--userid', self.userid] args += [action, self.get_daemon_name()] - data_dir = get_data_dir(self.fsid, self.daemon_type, self.daemon_id) + data_dir = get_data_dir(self.fsid, self.ctx.args.data_dir, + self.daemon_type, self.daemon_id) volume_mounts = self.get_container_mounts(data_dir) envs = self.get_container_envs() logger.info('Creating RADOS grace for action: %s' % action) c = CephContainer( + self.ctx, image=self.image, entrypoint=entrypoint, args=args, @@ -401,11 +428,13 @@ class CephIscsi(object): required_files = ['iscsi-gateway.cfg'] def __init__(self, + ctx, fsid, daemon_id, config_json, image=DEFAULT_IMAGE): - # type: (str, Union[int, str], Dict, str) -> None + # type: (CephadmContext, str, Union[int, str], Dict, str) -> None + self.ctx = ctx self.fsid = fsid self.daemon_id = daemon_id self.image = image @@ -417,9 +446,10 @@ class CephIscsi(object): self.validate() @classmethod - def init(cls, fsid, daemon_id): - # type: (str, Union[int, str]) -> CephIscsi - return cls(fsid, daemon_id, get_parm(args.config_json), args.image) + def init(cls, ctx, fsid, daemon_id): + # type: (CephadmContext, str, Union[int, str]) -> CephIscsi + return cls(ctx, fsid, daemon_id, + get_parm(ctx.args.config_json), ctx.args.image) @staticmethod def get_container_mounts(data_dir, log_dir): @@ -445,11 +475,11 @@ class CephIscsi(object): return binds @staticmethod - def get_version(container_id): - # type: (str) -> Optional[str] + def get_version(ctx, container_id): + # type: (CephadmContext, str) -> Optional[str] version = None - out, err, code = call( - [container_path, 'exec', container_id, + out, err, code = call(ctx, + [ctx.container_path, 'exec', container_id, '/usr/bin/python3', '-c', "import pkg_resources; print(pkg_resources.require('ceph_iscsi')[0].version)"]) if code == 0: version = out.strip() @@ -508,7 +538,7 @@ class CephIscsi(object): def get_tcmu_runner_container(self): # type: () -> CephContainer - tcmu_container = get_container(self.fsid, self.daemon_type, self.daemon_id) + tcmu_container = get_container(self.ctx, self.fsid, self.daemon_type, self.daemon_id) tcmu_container.entrypoint = "/usr/bin/tcmu-runner" tcmu_container.cname = self.get_container_name(desc='tcmu') # remove extra container args for tcmu container. @@ -524,8 +554,11 @@ class HAproxy(object): required_files = ['haproxy.cfg'] default_image = 'haproxy' - def __init__(self, fsid: str, daemon_id: Union[int, str], + def __init__(self, + ctx: CephadmContext, + fsid: str, daemon_id: Union[int, str], config_json: Dict, image: str) -> None: + self.ctx = ctx self.fsid = fsid self.daemon_id = daemon_id self.image = image @@ -536,8 +569,10 @@ class HAproxy(object): self.validate() @classmethod - def init(cls, fsid: str, daemon_id: Union[int, str]) -> 'HAproxy': - return cls(fsid, daemon_id, get_parm(args.config_json), args.image) + def init(cls, ctx: CephadmContext, + fsid: str, daemon_id: Union[int, str]) -> 'HAproxy': + return cls(ctx, fsid, daemon_id, get_parm(ctx.args.config_json), + ctx.args.image) def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None: """Create files under the container data dir""" @@ -582,7 +617,7 @@ class HAproxy(object): def extract_uid_gid_haproxy(self): # better directory for this? - return extract_uid_gid(file_path='/var/lib') + return extract_uid_gid(self.ctx, file_path='/var/lib') @staticmethod def get_container_mounts(data_dir: str) -> Dict[str, str]: @@ -599,8 +634,11 @@ class Keepalived(object): required_files = ['keepalived.conf'] default_image = 'arcts/keepalived' - def __init__(self, fsid: str, daemon_id: Union[int, str], + def __init__(self, + ctx: CephadmContext, + fsid: str, daemon_id: Union[int, str], config_json: Dict, image: str) -> None: + self.ctx = ctx self.fsid = fsid self.daemon_id = daemon_id self.image = image @@ -611,8 +649,10 @@ class Keepalived(object): self.validate() @classmethod - def init(cls, fsid: str, daemon_id: Union[int, str]) -> 'Keepalived': - return cls(fsid, daemon_id, get_parm(args.config_json), args.image) + def init(cls, ctx: CephadmContext, fsid: str, + daemon_id: Union[int, str]) -> 'Keepalived': + return cls(ctx, fsid, daemon_id, + get_parm(ctx.args.config_json), ctx.args.image) def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None: """Create files under the container data dir""" @@ -665,7 +705,7 @@ class Keepalived(object): def extract_uid_gid_keepalived(self): # better directory for this? - return extract_uid_gid(file_path='/var/lib') + return extract_uid_gid(self.ctx, file_path='/var/lib') @staticmethod def get_container_mounts(data_dir: str) -> Dict[str, str]: @@ -680,8 +720,10 @@ class CustomContainer(object): """Defines a custom container""" daemon_type = 'container' - def __init__(self, fsid: str, daemon_id: Union[int, str], + def __init__(self, ctx: CephadmContext, + fsid: str, daemon_id: Union[int, str], config_json: Dict, image: str) -> None: + self.ctx = ctx self.fsid = fsid self.daemon_id = daemon_id self.image = image @@ -700,8 +742,10 @@ class CustomContainer(object): self.files = dict_get(config_json, 'files', {}) @classmethod - def init(cls, fsid: str, daemon_id: Union[int, str]) -> 'CustomContainer': - return cls(fsid, daemon_id, get_parm(args.config_json), args.image) + def init(cls, ctx: CephadmContext, + fsid: str, daemon_id: Union[int, str]) -> 'CustomContainer': + return cls(ctx, fsid, daemon_id, + get_parm(ctx.args.config_json), ctx.args.image) def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None: """ @@ -844,8 +888,8 @@ def get_supported_daemons(): ################################## -def attempt_bind(s, address, port): - # type: (socket.socket, str, int) -> None +def attempt_bind(ctx, s, address, port): + # type: (CephadmContext, socket.socket, str, int) -> None try: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((address, port)) @@ -860,33 +904,33 @@ def attempt_bind(s, address, port): s.close() -def port_in_use(port_num): - # type: (int) -> bool +def port_in_use(ctx, port_num): + # type: (CephadmContext, int) -> bool """Detect whether a port is in use on the local machine - IPv4 and IPv6""" logger.info('Verifying port %d ...' % port_num) try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - attempt_bind(s, '0.0.0.0', port_num) + attempt_bind(ctx, s, '0.0.0.0', port_num) s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) - attempt_bind(s, '::', port_num) + attempt_bind(ctx, s, '::', port_num) except OSError: return True else: return False -def check_ip_port(ip, port): - # type: (str, int) -> None - if not args.skip_ping_check: +def check_ip_port(ctx, ip, port): + # type: (CephadmContext, str, int) -> None + if not ctx.args.skip_ping_check: logger.info('Verifying IP %s port %d ...' % (ip, port)) - if is_ipv6(ip): + if is_ipv6(ctx, ip): s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) ip = unwrap_ipv6(ip) else: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: - attempt_bind(s, ip, port) + attempt_bind(ctx, s, ip, port) except OSError as e: raise Error(e) @@ -929,10 +973,11 @@ class _Acquire_ReturnProxy(object): class FileLock(object): - def __init__(self, name, timeout=-1): + def __init__(self, ctx: CephadmContext, name, timeout=-1): if not os.path.exists(LOCK_DIR): os.mkdir(LOCK_DIR, 0o700) self._lock_file = os.path.join(LOCK_DIR, name + '.lock') + self.ctx = ctx # The file descriptor for the *_lock_file* as it is returned by the # os.open() function. @@ -977,6 +1022,7 @@ class FileLock(object): This method returns now a *proxy* object instead of *self*, so that it can be used in a with statement without side effects. """ + # Use the default timeout, if no timeout is provided. if timeout is None: timeout = self.timeout @@ -1090,7 +1136,8 @@ class CallVerbosity(Enum): VERBOSE = 3 -def call(command: List[str], +def call(ctx: CephadmContext, + command: List[str], desc: Optional[str] = None, verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE, timeout: Optional[int] = DEFAULT_TIMEOUT, @@ -1104,11 +1151,12 @@ def call(command: List[str], :param timeout: timeout in seconds """ + if desc is None: desc = command[0] if desc: desc += ': ' - timeout = timeout or args.timeout + timeout = timeout or ctx.args.timeout logger.debug("Running command: %s" % ' '.join(command)) process = subprocess.Popen( @@ -1218,20 +1266,21 @@ def call(command: List[str], return out, err, returncode -def call_throws(command: List[str], +def call_throws( + ctx: CephadmContext, + command: List[str], desc: Optional[str] = None, verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE, timeout: Optional[int] = DEFAULT_TIMEOUT, **kwargs) -> Tuple[str, str, int]: - out, err, ret = call(command, desc, verbosity, timeout, **kwargs) + out, err, ret = call(ctx, command, desc, verbosity, timeout, **kwargs) if ret: raise RuntimeError('Failed command: %s' % ' '.join(command)) return out, err, ret -def call_timeout(command, timeout): - # type: (List[str], int) -> int - +def call_timeout(ctx, command, timeout): + # type: (CephadmContext, List[str], int) -> int logger.debug('Running command (timeout=%s): %s' % (timeout, ' '.join(command))) @@ -1271,15 +1320,15 @@ def call_timeout(command, timeout): ################################## -def is_available(what, func): - # type: (str, Callable[[], bool]) -> None +def is_available(ctx, what, func): + # type: (CephadmContext, str, Callable[[], bool]) -> None """ Wait for a service to become available :param what: the name of the service :param func: the callable object that determines availability """ - retry = args.retry + retry = ctx.args.retry logger.info('Waiting for %s...' % what) num = 1 while True: @@ -1379,11 +1428,11 @@ def try_convert_datetime(s): return None -def get_podman_version(): - # type: () -> Tuple[int, ...] +def get_podman_version(ctx, container_path): + # type: (CephadmContext, str) -> Tuple[int, ...] if 'podman' not in container_path: raise ValueError('not using podman') - out, _, _ = call_throws([container_path, '--version']) + out, _, _ = call_throws(ctx, [container_path, '--version']) return _parse_podman_version(out) @@ -1460,22 +1509,22 @@ def infer_fsid(func): If we only find a single fsid in /var/lib/ceph/*, use that """ @wraps(func) - def _infer_fsid(): - if args.fsid: - logger.debug('Using specified fsid: %s' % args.fsid) - return func() + def _infer_fsid(ctx: CephadmContext): + if ctx.args.fsid: + logger.debug('Using specified fsid: %s' % ctx.args.fsid) + return func(ctx) fsids_set = set() - daemon_list = list_daemons(detail=False) + daemon_list = list_daemons(ctx, detail=False) for daemon in daemon_list: if not is_fsid(daemon['fsid']): # 'unknown' fsid continue - elif 'name' not in args or not args.name: - # args.name not specified + elif 'name' not in ctx.args or not ctx.args.name: + # ctx.args.name not specified fsids_set.add(daemon['fsid']) - elif daemon['name'] == args.name: - # args.name is a match + elif daemon['name'] == ctx.args.name: + # ctx.args.name is a match fsids_set.add(daemon['fsid']) fsids = sorted(fsids_set) @@ -1484,10 +1533,10 @@ def infer_fsid(func): pass elif len(fsids) == 1: logger.info('Inferring fsid %s' % fsids[0]) - args.fsid = fsids[0] + ctx.args.fsid = fsids[0] else: raise Error('Cannot infer an fsid, one must be specified: %s' % fsids) - return func() + return func(ctx) return _infer_fsid @@ -1497,33 +1546,34 @@ def infer_config(func): If we find a MON daemon, use the config from that container """ @wraps(func) - def _infer_config(): - if args.config: - logger.debug('Using specified config: %s' % args.config) - return func() + def _infer_config(ctx: CephadmContext): + if ctx.args.config: + logger.debug('Using specified config: %s' % ctx.args.config) + return func(ctx) config = None - if args.fsid: - name = args.name + if ctx.args.fsid: + name = ctx.args.name if not name: - daemon_list = list_daemons(detail=False) + daemon_list = list_daemons(ctx, detail=False) for daemon in daemon_list: if daemon['name'].startswith('mon.'): name = daemon['name'] break if name: - config = '/var/lib/ceph/{}/{}/config'.format(args.fsid, name) + config = '/var/lib/ceph/{}/{}/config'.format(ctx.args.fsid, + name) if config: logger.info('Inferring config %s' % config) - args.config = config + ctx.args.config = config elif os.path.exists(SHELL_DEFAULT_CONF): logger.debug('Using default config: %s' % SHELL_DEFAULT_CONF) - args.config = SHELL_DEFAULT_CONF - return func() + ctx.args.config = SHELL_DEFAULT_CONF + return func(ctx) return _infer_config -def _get_default_image(): +def _get_default_image(ctx: CephadmContext): if DEFAULT_IMAGE_IS_MASTER: warn = '''This is a development version of cephadm. For information regarding the latest stable release: @@ -1539,54 +1589,54 @@ def infer_image(func): Use the most recent ceph image """ @wraps(func) - def _infer_image(): - if not args.image: - args.image = os.environ.get('CEPHADM_IMAGE') - if not args.image: - args.image = get_last_local_ceph_image() - if not args.image: - args.image = _get_default_image() - return func() + def _infer_image(ctx: CephadmContext): + if not ctx.args.image: + ctx.args.image = os.environ.get('CEPHADM_IMAGE') + if not ctx.args.image: + ctx.args.image = get_last_local_ceph_image(ctx, ctx.container_path) + if not ctx.args.image: + ctx.args.image = _get_default_image(ctx) + return func(ctx) return _infer_image def default_image(func): @wraps(func) - def _default_image(): - if not args.image: - if 'name' in args and args.name: - type_ = args.name.split('.', 1)[0] + def _default_image(ctx: CephadmContext): + if not ctx.args.image: + if 'name' in ctx.args and ctx.args.name: + type_ = ctx.args.name.split('.', 1)[0] if type_ in Monitoring.components: - args.image = Monitoring.components[type_]['image'] + ctx.args.image = Monitoring.components[type_]['image'] if type_ == 'haproxy': - args.image = HAproxy.default_image + ctx.args.image = HAproxy.default_image if type_ == 'keepalived': - args.image = Keepalived.default_image - if not args.image: - args.image = os.environ.get('CEPHADM_IMAGE') - if not args.image: - args.image = _get_default_image() + ctx.args.image = Keepalived.default_image + if not ctx.args.image: + ctx.args.image = os.environ.get('CEPHADM_IMAGE') + if not ctx.args.image: + ctx.args.image = _get_default_image(ctx) - return func() + return func(ctx) return _default_image -def get_last_local_ceph_image(): +def get_last_local_ceph_image(ctx: CephadmContext, container_path: str): """ :return: The most recent local ceph image (already pulled) """ - out, _, _ = call_throws( + out, _, _ = call_throws(ctx, [container_path, 'images', '--filter', 'label=ceph=True', '--filter', 'dangling=false', '--format', '{{.Repository}}@{{.Digest}}']) - return _filter_last_local_ceph_image(out) + return _filter_last_local_ceph_image(ctx, out) -def _filter_last_local_ceph_image(out): - # str -> Optional[str] +def _filter_last_local_ceph_image(ctx, out): + # type: (CephadmContext, str) -> Optional[str] for image in out.splitlines(): if image and not image.endswith('@'): logger.info('Using recent ceph image %s' % image) @@ -1615,19 +1665,19 @@ def makedirs(dir, uid, gid, mode): os.chmod(dir, mode) # the above is masked by umask... -def get_data_dir(fsid, t, n): - # type: (str, str, Union[int, str]) -> str - return os.path.join(args.data_dir, fsid, '%s.%s' % (t, n)) +def get_data_dir(fsid, data_dir, t, n): + # type: (str, str, str, Union[int, str]) -> str + return os.path.join(data_dir, fsid, '%s.%s' % (t, n)) -def get_log_dir(fsid): - # type: (str) -> str - return os.path.join(args.log_dir, fsid) +def get_log_dir(fsid, log_dir): + # type: (str, str) -> str + return os.path.join(log_dir, fsid) -def make_data_dir_base(fsid, uid, gid): - # type: (str, int, int) -> str - data_dir_base = os.path.join(args.data_dir, fsid) +def make_data_dir_base(fsid, data_dir, uid, gid): + # type: (str, str, int, int) -> str + data_dir_base = os.path.join(data_dir, fsid) makedirs(data_dir_base, uid, gid, DATA_DIR_MODE) makedirs(os.path.join(data_dir_base, 'crash'), uid, gid, DATA_DIR_MODE) makedirs(os.path.join(data_dir_base, 'crash', 'posted'), uid, gid, @@ -1635,38 +1685,38 @@ def make_data_dir_base(fsid, uid, gid): return data_dir_base -def make_data_dir(fsid, daemon_type, daemon_id, uid=None, gid=None): - # type: (str, str, Union[int, str], Optional[int], Optional[int]) -> str +def make_data_dir(ctx, fsid, daemon_type, daemon_id, uid=None, gid=None): + # type: (CephadmContext, str, str, Union[int, str], Optional[int], Optional[int]) -> str if uid is None or gid is None: - uid, gid = extract_uid_gid() - make_data_dir_base(fsid, uid, gid) - data_dir = get_data_dir(fsid, daemon_type, daemon_id) + uid, gid = extract_uid_gid(ctx) + make_data_dir_base(fsid, ctx.args.data_dir, uid, gid) + data_dir = get_data_dir(fsid, ctx.args.data_dir, daemon_type, daemon_id) makedirs(data_dir, uid, gid, DATA_DIR_MODE) return data_dir -def make_log_dir(fsid, uid=None, gid=None): - # type: (str, Optional[int], Optional[int]) -> str +def make_log_dir(ctx, fsid, uid=None, gid=None): + # type: (CephadmContext, str, Optional[int], Optional[int]) -> str if uid is None or gid is None: - uid, gid = extract_uid_gid() - log_dir = get_log_dir(fsid) + uid, gid = extract_uid_gid(ctx) + log_dir = get_log_dir(fsid, ctx.args.log_dir) makedirs(log_dir, uid, gid, LOG_DIR_MODE) return log_dir -def make_var_run(fsid, uid, gid): - # type: (str, int, int) -> None - call_throws(['install', '-d', '-m0770', '-o', str(uid), '-g', str(gid), +def make_var_run(ctx, fsid, uid, gid): + # type: (CephadmContext, str, int, int) -> None + call_throws(ctx, ['install', '-d', '-m0770', '-o', str(uid), '-g', str(gid), '/var/run/ceph/%s' % fsid]) -def copy_tree(src, dst, uid=None, gid=None): - # type: (List[str], str, Optional[int], Optional[int]) -> None +def copy_tree(ctx, src, dst, uid=None, gid=None): + # type: (CephadmContext, List[str], str, Optional[int], Optional[int]) -> None """ Copy a directory tree from src to dst """ if uid is None or gid is None: - (uid, gid) = extract_uid_gid() + (uid, gid) = extract_uid_gid(ctx) for src_dir in src: dst_dir = dst @@ -1685,13 +1735,13 @@ def copy_tree(src, dst, uid=None, gid=None): os.chown(os.path.join(dirpath, filename), uid, gid) -def copy_files(src, dst, uid=None, gid=None): - # type: (List[str], str, Optional[int], Optional[int]) -> None +def copy_files(ctx, src, dst, uid=None, gid=None): + # type: (CephadmContext, List[str], str, Optional[int], Optional[int]) -> None """ Copy a files from src to dst """ if uid is None or gid is None: - (uid, gid) = extract_uid_gid() + (uid, gid) = extract_uid_gid(ctx) for src_file in src: dst_file = dst @@ -1705,13 +1755,13 @@ def copy_files(src, dst, uid=None, gid=None): os.chown(dst_file, uid, gid) -def move_files(src, dst, uid=None, gid=None): - # type: (List[str], str, Optional[int], Optional[int]) -> None +def move_files(ctx, src, dst, uid=None, gid=None): + # type: (CephadmContext, List[str], str, Optional[int], Optional[int]) -> None """ Move files from src to dst """ if uid is None or gid is None: - (uid, gid) = extract_uid_gid() + (uid, gid) = extract_uid_gid(ctx) for src_file in src: dst_file = dst @@ -1787,23 +1837,23 @@ def get_unit_name(fsid, daemon_type, daemon_id=None): return 'ceph-%s@%s' % (fsid, daemon_type) -def get_unit_name_by_daemon_name(fsid, name): - daemon = get_daemon_description(fsid, name) +def get_unit_name_by_daemon_name(ctx: CephadmContext, fsid, name): + daemon = get_daemon_description(ctx, fsid, name) try: return daemon['systemd_unit'] except KeyError: raise Error('Failed to get unit name for {}'.format(daemon)) -def check_unit(unit_name): - # type: (str) -> Tuple[bool, str, bool] +def check_unit(ctx, unit_name): + # type: (CephadmContext, str) -> Tuple[bool, str, bool] # NOTE: we ignore the exit code here because systemctl outputs # various exit codes based on the state of the service, but the # string result is more explicit (and sufficient). enabled = False installed = False try: - out, err, code = call(['systemctl', 'is-enabled', unit_name], + out, err, code = call(ctx, ['systemctl', 'is-enabled', unit_name], verbosity=CallVerbosity.DEBUG) if code == 0: enabled = True @@ -1817,7 +1867,7 @@ def check_unit(unit_name): state = 'unknown' try: - out, err, code = call(['systemctl', 'is-active', unit_name], + out, err, code = call(ctx, ['systemctl', 'is-active', unit_name], verbosity=CallVerbosity.DEBUG) out = out.strip() if out in ['active']: @@ -1834,10 +1884,10 @@ def check_unit(unit_name): return (enabled, state, installed) -def check_units(units, enabler=None): - # type: (List[str], Optional[Packager]) -> bool +def check_units(ctx, units, enabler=None): + # type: (CephadmContext, List[str], Optional[Packager]) -> bool for u in units: - (enabled, state, installed) = check_unit(u) + (enabled, state, installed) = check_unit(ctx, u) if enabled and state == 'running': logger.info('Unit %s is enabled and running' % u) return True @@ -1861,12 +1911,13 @@ def get_legacy_config_fsid(cluster, legacy_dir=None): return None -def get_legacy_daemon_fsid(cluster, daemon_type, daemon_id, legacy_dir=None): - # type: (str, str, Union[int, str], Optional[str]) -> Optional[str] +def get_legacy_daemon_fsid(ctx, cluster, + daemon_type, daemon_id, legacy_dir=None): + # type: (CephadmContext, str, str, Union[int, str], Optional[str]) -> Optional[str] fsid = None if daemon_type == 'osd': try: - fsid_file = os.path.join(args.data_dir, + fsid_file = os.path.join(ctx.args.data_dir, daemon_type, 'ceph-%s' % daemon_id, 'ceph_fsid') @@ -1881,8 +1932,8 @@ def get_legacy_daemon_fsid(cluster, daemon_type, daemon_id, legacy_dir=None): return fsid -def get_daemon_args(fsid, daemon_type, daemon_id): - # type: (str, str, Union[int, str]) -> List[str] +def get_daemon_args(ctx, fsid, daemon_type, daemon_id): + # type: (CephadmContext, str, str, Union[int, str]) -> List[str] r = list() # type: List[str] if daemon_type in Ceph.daemons and daemon_type != 'crash': @@ -1902,30 +1953,30 @@ def get_daemon_args(fsid, daemon_type, daemon_id): metadata = Monitoring.components[daemon_type] r += metadata.get('args', list()) if daemon_type == 'alertmanager': - config = get_parm(args.config_json) + config = get_parm(ctx.args.config_json) peers = config.get('peers', list()) # type: ignore for peer in peers: r += ["--cluster.peer={}".format(peer)] # some alertmanager, by default, look elsewhere for a config r += ["--config.file=/etc/alertmanager/alertmanager.yml"] elif daemon_type == NFSGanesha.daemon_type: - nfs_ganesha = NFSGanesha.init(fsid, daemon_id) + nfs_ganesha = NFSGanesha.init(ctx, fsid, daemon_id) r += nfs_ganesha.get_daemon_args() elif daemon_type == HAproxy.daemon_type: - haproxy = HAproxy.init(fsid, daemon_id) + haproxy = HAproxy.init(ctx, fsid, daemon_id) r += haproxy.get_daemon_args() elif daemon_type == CustomContainer.daemon_type: - cc = CustomContainer.init(fsid, daemon_id) + cc = CustomContainer.init(ctx, fsid, daemon_id) r.extend(cc.get_daemon_args()) return r -def create_daemon_dirs(fsid, daemon_type, daemon_id, uid, gid, +def create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid, config=None, keyring=None): - # type: (str, str, Union[int, str], int, int, Optional[str], Optional[str]) -> None - data_dir = make_data_dir(fsid, daemon_type, daemon_id, uid=uid, gid=gid) - make_log_dir(fsid, uid=uid, gid=gid) + # type: (CephadmContext, str, str, Union[int, str], int, int, Optional[str], Optional[str]) -> None + data_dir = make_data_dir(ctx, fsid, daemon_type, daemon_id, uid=uid, gid=gid) + make_log_dir(ctx, fsid, uid=uid, gid=gid) if config: config_path = os.path.join(data_dir, 'config') @@ -1942,27 +1993,30 @@ def create_daemon_dirs(fsid, daemon_type, daemon_id, uid, gid, f.write(keyring) if daemon_type in Monitoring.components.keys(): - config_json: Dict[str, Any] = get_parm(args.config_json) + config_json: Dict[str, Any] = get_parm(ctx.args.config_json) required_files = Monitoring.components[daemon_type].get('config-json-files', list()) # Set up directories specific to the monitoring component config_dir = '' data_dir_root = '' if daemon_type == 'prometheus': - data_dir_root = get_data_dir(fsid, daemon_type, daemon_id) + data_dir_root = get_data_dir(fsid, ctx.args.data_dir, + daemon_type, daemon_id) config_dir = 'etc/prometheus' makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755) makedirs(os.path.join(data_dir_root, config_dir, 'alerting'), uid, gid, 0o755) makedirs(os.path.join(data_dir_root, 'data'), uid, gid, 0o755) elif daemon_type == 'grafana': - data_dir_root = get_data_dir(fsid, daemon_type, daemon_id) + data_dir_root = get_data_dir(fsid, ctx.args.data_dir, + daemon_type, daemon_id) config_dir = 'etc/grafana' makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755) makedirs(os.path.join(data_dir_root, config_dir, 'certs'), uid, gid, 0o755) makedirs(os.path.join(data_dir_root, config_dir, 'provisioning/datasources'), uid, gid, 0o755) makedirs(os.path.join(data_dir_root, 'data'), uid, gid, 0o755) elif daemon_type == 'alertmanager': - data_dir_root = get_data_dir(fsid, daemon_type, daemon_id) + data_dir_root = get_data_dir(fsid, ctx.args.data_dir, + daemon_type, daemon_id) config_dir = 'etc/alertmanager' makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755) makedirs(os.path.join(data_dir_root, config_dir, 'data'), uid, gid, 0o755) @@ -1977,23 +2031,23 @@ def create_daemon_dirs(fsid, daemon_type, daemon_id, uid, gid, f.write(content) elif daemon_type == NFSGanesha.daemon_type: - nfs_ganesha = NFSGanesha.init(fsid, daemon_id) + nfs_ganesha = NFSGanesha.init(ctx, fsid, daemon_id) nfs_ganesha.create_daemon_dirs(data_dir, uid, gid) elif daemon_type == CephIscsi.daemon_type: - ceph_iscsi = CephIscsi.init(fsid, daemon_id) + ceph_iscsi = CephIscsi.init(ctx, fsid, daemon_id) ceph_iscsi.create_daemon_dirs(data_dir, uid, gid) elif daemon_type == HAproxy.daemon_type: - haproxy = HAproxy.init(fsid, daemon_id) + haproxy = HAproxy.init(ctx, fsid, daemon_id) haproxy.create_daemon_dirs(data_dir, uid, gid) elif daemon_type == Keepalived.daemon_type: - keepalived = Keepalived.init(fsid, daemon_id) + keepalived = Keepalived.init(ctx, fsid, daemon_id) keepalived.create_daemon_dirs(data_dir, uid, gid) elif daemon_type == CustomContainer.daemon_type: - cc = CustomContainer.init(fsid, daemon_id) + cc = CustomContainer.init(ctx, fsid, daemon_id) cc.create_daemon_dirs(data_dir, uid, gid) @@ -2032,47 +2086,47 @@ def get_parm(option): return js -def get_config_and_keyring(): - # type: () -> Tuple[Optional[str], Optional[str]] +def get_config_and_keyring(ctx): + # type: (CephadmContext) -> Tuple[Optional[str], Optional[str]] config = None keyring = None - if 'config_json' in args and args.config_json: - d = get_parm(args.config_json) + if 'config_json' in ctx.args and ctx.args.config_json: + d = get_parm(ctx.args.config_json) config = d.get('config') keyring = d.get('keyring') - if 'config' in args and args.config: - with open(args.config, 'r') as f: + if 'config' in ctx.args and ctx.args.config: + with open(ctx.args.config, 'r') as f: config = f.read() - if 'key' in args and args.key: - keyring = '[%s]\n\tkey = %s\n' % (args.name, args.key) - elif 'keyring' in args and args.keyring: - with open(args.keyring, 'r') as f: + if 'key' in ctx.args and ctx.args.key: + keyring = '[%s]\n\tkey = %s\n' % (ctx.args.name, ctx.args.key) + elif 'keyring' in ctx.args and ctx.args.keyring: + with open(ctx.args.keyring, 'r') as f: keyring = f.read() return config, keyring -def get_container_binds(fsid, daemon_type, daemon_id): - # type: (str, str, Union[int, str, None]) -> List[List[str]] +def get_container_binds(ctx, fsid, daemon_type, daemon_id): + # type: (CephadmContext, str, str, Union[int, str, None]) -> List[List[str]] binds = list() if daemon_type == CephIscsi.daemon_type: binds.extend(CephIscsi.get_container_binds()) elif daemon_type == CustomContainer.daemon_type: assert daemon_id - cc = CustomContainer.init(fsid, daemon_id) - data_dir = get_data_dir(fsid, daemon_type, daemon_id) + cc = CustomContainer.init(ctx, fsid, daemon_id) + data_dir = get_data_dir(fsid, ctx.args.data_dir, daemon_type, daemon_id) binds.extend(cc.get_container_binds(data_dir)) return binds -def get_container_mounts(fsid, daemon_type, daemon_id, +def get_container_mounts(ctx, fsid, daemon_type, daemon_id, no_config=False): - # type: (str, str, Union[int, str, None], Optional[bool]) -> Dict[str, str] + # type: (CephadmContext, str, str, Union[int, str, None], Optional[bool]) -> Dict[str, str] mounts = dict() if daemon_type in Ceph.daemons: @@ -2080,14 +2134,14 @@ def get_container_mounts(fsid, daemon_type, daemon_id, run_path = os.path.join('/var/run/ceph', fsid); if os.path.exists(run_path): mounts[run_path] = '/var/run/ceph:z' - log_dir = get_log_dir(fsid) + log_dir = get_log_dir(fsid, ctx.args.log_dir) mounts[log_dir] = '/var/log/ceph:z' crash_dir = '/var/lib/ceph/%s/crash' % fsid if os.path.exists(crash_dir): mounts[crash_dir] = '/var/lib/ceph/crash:z' if daemon_type in Ceph.daemons and daemon_id: - data_dir = get_data_dir(fsid, daemon_type, daemon_id) + data_dir = get_data_dir(fsid, ctx.args.data_dir, daemon_type, daemon_id) if daemon_type == 'rgw': cdata_dir = '/var/lib/ceph/radosgw/ceph-rgw.%s' % (daemon_id) else: @@ -2109,8 +2163,8 @@ def get_container_mounts(fsid, daemon_type, daemon_id, mounts['/run/lock/lvm'] = '/run/lock/lvm' try: - if args.shared_ceph_folder: # make easy manager modules/ceph-volume development - ceph_folder = pathify(args.shared_ceph_folder) + if ctx.args.shared_ceph_folder: # make easy manager modules/ceph-volume development + ceph_folder = pathify(ctx.args.shared_ceph_folder) if os.path.exists(ceph_folder): mounts[ceph_folder + '/src/ceph-volume/ceph_volume'] = '/usr/lib/python3.6/site-packages/ceph_volume' mounts[ceph_folder + '/src/pybind/mgr'] = '/usr/share/ceph/mgr' @@ -2125,7 +2179,7 @@ def get_container_mounts(fsid, daemon_type, daemon_id, pass if daemon_type in Monitoring.components and daemon_id: - data_dir = get_data_dir(fsid, daemon_type, daemon_id) + data_dir = get_data_dir(fsid, ctx.args.data_dir, daemon_type, daemon_id) if daemon_type == 'prometheus': mounts[os.path.join(data_dir, 'etc/prometheus')] = '/etc/prometheus:Z' mounts[os.path.join(data_dir, 'data')] = '/prometheus:Z' @@ -2142,36 +2196,37 @@ def get_container_mounts(fsid, daemon_type, daemon_id, if daemon_type == NFSGanesha.daemon_type: assert daemon_id - data_dir = get_data_dir(fsid, daemon_type, daemon_id) - nfs_ganesha = NFSGanesha.init(fsid, daemon_id) + data_dir = get_data_dir(fsid, ctx.args.data_dir, daemon_type, daemon_id) + nfs_ganesha = NFSGanesha.init(ctx, fsid, daemon_id) mounts.update(nfs_ganesha.get_container_mounts(data_dir)) if daemon_type == HAproxy.daemon_type: assert daemon_id - data_dir = get_data_dir(fsid, daemon_type, daemon_id) + data_dir = get_data_dir(fsid, daemon_type, daemon_type, daemon_id) mounts.update(HAproxy.get_container_mounts(data_dir)) if daemon_type == CephIscsi.daemon_type: assert daemon_id - data_dir = get_data_dir(fsid, daemon_type, daemon_id) - log_dir = get_log_dir(fsid) + data_dir = get_data_dir(fsid, ctx.args.data_dir, daemon_type, daemon_id) + log_dir = get_log_dir(fsid, ctx.args.log_dir) mounts.update(CephIscsi.get_container_mounts(data_dir, log_dir)) if daemon_type == Keepalived.daemon_type: assert daemon_id - data_dir = get_data_dir(fsid, daemon_type, daemon_id) + data_dir = get_data_dir(fsid, daemon_type, daemon_type, daemon_id) mounts.update(Keepalived.get_container_mounts(data_dir)) if daemon_type == CustomContainer.daemon_type: assert daemon_id - cc = CustomContainer.init(fsid, daemon_id) - data_dir = get_data_dir(fsid, daemon_type, daemon_id) + cc = CustomContainer.init(ctx, fsid, daemon_id) + data_dir = get_data_dir(fsid, ctx.args.data_dir, daemon_type, daemon_id) mounts.update(cc.get_container_mounts(data_dir)) return mounts -def get_container(fsid: str, daemon_type: str, daemon_id: Union[int, str], +def get_container(ctx: CephadmContext, + fsid: str, daemon_type: str, daemon_id: Union[int, str], privileged: bool = False, ptrace: bool = False, container_args: Optional[List[str]] = None) -> 'CephContainer': @@ -2217,14 +2272,14 @@ def get_container(fsid: str, daemon_type: str, daemon_id: Union[int, str], # to configfs we need to make this a privileged container. privileged = True elif daemon_type == CustomContainer.daemon_type: - cc = CustomContainer.init(fsid, daemon_id) + cc = CustomContainer.init(ctx, fsid, daemon_id) entrypoint = cc.entrypoint host_network = False envs.extend(cc.get_container_envs()) container_args.extend(cc.get_container_args()) if daemon_type in Monitoring.components: - uid, gid = extract_uid_gid_monitoring(daemon_type) + uid, gid = extract_uid_gid_monitoring(ctx, daemon_type) monitoring_args = [ '--user', str(uid), @@ -2239,7 +2294,7 @@ def get_container(fsid: str, daemon_type: str, daemon_id: Union[int, str], # if using podman, set -d, --conmon-pidfile & --cidfile flags # so service can have Type=Forking - if 'podman' in container_path: + if 'podman' in ctx.container_path: runtime_dir = '/run' container_args.extend(['-d', '--conmon-pidfile', @@ -2248,26 +2303,27 @@ def get_container(fsid: str, daemon_type: str, daemon_id: Union[int, str], runtime_dir + '/ceph-%s@%s.%s.service-cid' % (fsid, daemon_type, daemon_id)]) return CephContainer( - image=args.image, + ctx, + image=ctx.args.image, entrypoint=entrypoint, - args=ceph_args + get_daemon_args(fsid, daemon_type, daemon_id), + args=ceph_args + get_daemon_args(ctx, fsid, daemon_type, daemon_id), container_args=container_args, - volume_mounts=get_container_mounts(fsid, daemon_type, daemon_id), - bind_mounts=get_container_binds(fsid, daemon_type, daemon_id), + volume_mounts=get_container_mounts(ctx, fsid, daemon_type, daemon_id), + bind_mounts=get_container_binds(ctx, fsid, daemon_type, daemon_id), cname='ceph-%s-%s.%s' % (fsid, daemon_type, daemon_id), envs=envs, privileged=privileged, ptrace=ptrace, - init=args.container_init, + init=ctx.args.container_init, host_network=host_network, ) -def extract_uid_gid(img='', file_path='/var/lib/ceph'): - # type: (str, Union[str, List[str]]) -> Tuple[int, int] +def extract_uid_gid(ctx, img='', file_path='/var/lib/ceph'): + # type: (CephadmContext, str, Union[str, List[str]]) -> Tuple[int, int] if not img: - img = args.image + img = ctx.args.image if isinstance(file_path, str): paths = [file_path] @@ -2277,6 +2333,7 @@ def extract_uid_gid(img='', file_path='/var/lib/ceph'): for fp in paths: try: out = CephContainer( + ctx, image=img, entrypoint='stat', args=['-c', '%u %g', fp] @@ -2288,18 +2345,18 @@ def extract_uid_gid(img='', file_path='/var/lib/ceph'): raise RuntimeError('uid/gid not found') -def deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid, +def deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid, config=None, keyring=None, osd_fsid=None, reconfig=False, ports=None): - # type: (str, str, Union[int, str], Optional[CephContainer], int, int, Optional[str], Optional[str], Optional[str], Optional[bool], Optional[List[int]]) -> None + # type: (CephadmContext, str, str, Union[int, str], Optional[CephContainer], int, int, Optional[str], Optional[str], Optional[str], Optional[bool], Optional[List[int]]) -> None ports = ports or [] - if any([port_in_use(port) for port in ports]): + if any([port_in_use(ctx, port) for port in ports]): raise Error("TCP Port(s) '{}' required for {} already in use".format(",".join(map(str, ports)), daemon_type)) - data_dir = get_data_dir(fsid, daemon_type, daemon_id) + data_dir = get_data_dir(fsid, ctx.args.data_dir, daemon_type, daemon_id) if reconfig and not os.path.exists(data_dir): raise Error('cannot reconfig, data path %s does not exist' % data_dir) if daemon_type == 'mon' and not os.path.exists(data_dir): @@ -2312,18 +2369,19 @@ def deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid, tmp_config = write_tmp(config, uid, gid) # --mkfs - create_daemon_dirs(fsid, daemon_type, daemon_id, uid, gid) - mon_dir = get_data_dir(fsid, 'mon', daemon_id) - log_dir = get_log_dir(fsid) + create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid) + mon_dir = get_data_dir(fsid, ctx.args.data_dir, 'mon', daemon_id) + log_dir = get_log_dir(fsid, ctx.args.log_dir) out = CephContainer( - image=args.image, + ctx, + image=ctx.args.image, entrypoint='/usr/bin/ceph-mon', args=['--mkfs', '-i', str(daemon_id), '--fsid', fsid, '-c', '/tmp/config', '--keyring', '/tmp/keyring', - ] + get_daemon_args(fsid, 'mon', daemon_id), + ] + get_daemon_args(ctx, fsid, 'mon', daemon_id), volume_mounts={ log_dir: '/var/log/ceph:z', mon_dir: '/var/lib/ceph/mon/ceph-%s:z' % (daemon_id), @@ -2340,6 +2398,7 @@ def deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid, else: # dirs, conf, keyring create_daemon_dirs( + ctx, fsid, daemon_type, daemon_id, uid, gid, config, keyring) @@ -2348,18 +2407,18 @@ def deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid, if daemon_type == CephadmDaemon.daemon_type: port = next(iter(ports), None) # get first tcp port provided or None - if args.config_json == '-': + if ctx.args.config_json == '-': config_js = get_parm('-') else: - config_js = get_parm(args.config_json) + config_js = get_parm(ctx.args.config_json) assert isinstance(config_js, dict) - cephadm_exporter = CephadmDaemon(fsid, daemon_id, port) + cephadm_exporter = CephadmDaemon(ctx, fsid, daemon_id, port) cephadm_exporter.deploy_daemon_unit(config_js) else: if c: - deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c, - osd_fsid=osd_fsid) + deploy_daemon_units(ctx, fsid, uid, gid, daemon_type, daemon_id, + c, osd_fsid=osd_fsid) else: raise RuntimeError("attempting to deploy a daemon without a container image") @@ -2374,24 +2433,24 @@ def deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid, os.fchmod(f.fileno(), 0o600) os.fchown(f.fileno(), uid, gid) - update_firewalld(daemon_type) + update_firewalld(ctx, daemon_type) # Open ports explicitly required for the daemon if ports: - fw = Firewalld() + fw = Firewalld(ctx) fw.open_ports(ports) fw.apply_rules() if reconfig and daemon_type not in Ceph.daemons: # ceph daemons do not need a restart; others (presumably) do to pick # up the new config - call_throws(['systemctl', 'reset-failed', + call_throws(ctx, ['systemctl', 'reset-failed', get_unit_name(fsid, daemon_type, daemon_id)]) - call_throws(['systemctl', 'restart', + call_throws(ctx, ['systemctl', 'restart', get_unit_name(fsid, daemon_type, daemon_id)]) -def _write_container_cmd_to_bash(file_obj, container, comment=None, background=False): - # type: (IO[str], CephContainer, Optional[str], Optional[bool]) -> None +def _write_container_cmd_to_bash(ctx, file_obj, container, comment=None, background=False): + # type: (CephadmContext, IO[str], CephContainer, Optional[str], Optional[bool]) -> None if comment: # Sometimes adding a comment, especially if there are multiple containers in one # unit file, makes it easier to read and grok. @@ -2399,19 +2458,19 @@ def _write_container_cmd_to_bash(file_obj, container, comment=None, background=F # Sometimes, adding `--rm` to a run_cmd doesn't work. Let's remove the container manually file_obj.write('! '+ ' '.join(container.rm_cmd()) + '2> /dev/null\n') # Sometimes, `podman rm` doesn't find the container. Then you'll have to add `--storage` - if 'podman' in container_path: + if 'podman' in ctx.container_path: file_obj.write('! '+ ' '.join(container.rm_cmd(storage=True)) + '2> /dev/null\n') # container run command file_obj.write(' '.join(container.run_cmd()) + (' &' if background else '') + '\n') -def deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c, +def deploy_daemon_units(ctx, fsid, uid, gid, daemon_type, daemon_id, c, enable=True, start=True, osd_fsid=None): - # type: (str, int, int, str, Union[int, str], CephContainer, bool, bool, Optional[str]) -> None + # type: (CephadmContext, str, int, int, str, Union[int, str], CephContainer, bool, bool, Optional[str]) -> None # cmd - data_dir = get_data_dir(fsid, daemon_type, daemon_id) + data_dir = get_data_dir(fsid, ctx.args.data_dir, daemon_type, daemon_id) with open(data_dir + '/unit.run.new', 'w') as f: f.write('set -e\n') @@ -2432,7 +2491,8 @@ def deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c, f.write('[ ! -L {p} ] || chown {uid}:{gid} {p}\n'.format(p=p, uid=uid, gid=gid)) else: prestart = CephContainer( - image=args.image, + ctx, + image=ctx.args.image, entrypoint='/usr/sbin/ceph-volume', args=[ 'lvm', 'activate', @@ -2440,23 +2500,23 @@ def deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c, '--no-systemd' ], privileged=True, - volume_mounts=get_container_mounts(fsid, daemon_type, daemon_id), - bind_mounts=get_container_binds(fsid, daemon_type, daemon_id), + volume_mounts=get_container_mounts(ctx, fsid, daemon_type, daemon_id), + bind_mounts=get_container_binds(ctx, fsid, daemon_type, daemon_id), cname='ceph-%s-%s.%s-activate' % (fsid, daemon_type, daemon_id), ) - _write_container_cmd_to_bash(f, prestart, 'LVM OSDs use ceph-volume lvm activate') + _write_container_cmd_to_bash(ctx, f, prestart, 'LVM OSDs use ceph-volume lvm activate') elif daemon_type == NFSGanesha.daemon_type: # add nfs to the rados grace db - nfs_ganesha = NFSGanesha.init(fsid, daemon_id) + nfs_ganesha = NFSGanesha.init(ctx, fsid, daemon_id) prestart = nfs_ganesha.get_rados_grace_container('add') - _write_container_cmd_to_bash(f, prestart, 'add daemon to rados grace') + _write_container_cmd_to_bash(ctx, f, prestart, 'add daemon to rados grace') elif daemon_type == CephIscsi.daemon_type: f.write(' '.join(CephIscsi.configfs_mount_umount(data_dir, mount=True)) + '\n') - ceph_iscsi = CephIscsi.init(fsid, daemon_id) + ceph_iscsi = CephIscsi.init(ctx, fsid, daemon_id) tcmu_container = ceph_iscsi.get_tcmu_runner_container() - _write_container_cmd_to_bash(f, tcmu_container, 'iscsi tcmu-runnter container', background=True) + _write_container_cmd_to_bash(ctx, f, tcmu_container, 'iscsi tcmu-runnter container', background=True) - _write_container_cmd_to_bash(f, c, '%s.%s' % (daemon_type, str(daemon_id))) + _write_container_cmd_to_bash(ctx, f, c, '%s.%s' % (daemon_type, str(daemon_id))) os.fchmod(f.fileno(), 0o600) os.rename(data_dir + '/unit.run.new', data_dir + '/unit.run') @@ -2466,27 +2526,28 @@ def deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c, if daemon_type == 'osd': assert osd_fsid poststop = CephContainer( - image=args.image, + ctx, + image=ctx.args.image, entrypoint='/usr/sbin/ceph-volume', args=[ 'lvm', 'deactivate', str(daemon_id), osd_fsid, ], privileged=True, - volume_mounts=get_container_mounts(fsid, daemon_type, daemon_id), - bind_mounts=get_container_binds(fsid, daemon_type, daemon_id), + volume_mounts=get_container_mounts(ctx, fsid, daemon_type, daemon_id), + bind_mounts=get_container_binds(ctx, fsid, daemon_type, daemon_id), cname='ceph-%s-%s.%s-deactivate' % (fsid, daemon_type, daemon_id), ) - _write_container_cmd_to_bash(f, poststop, 'deactivate osd') + _write_container_cmd_to_bash(ctx, f, poststop, 'deactivate osd') elif daemon_type == NFSGanesha.daemon_type: # remove nfs from the rados grace db - nfs_ganesha = NFSGanesha.init(fsid, daemon_id) + nfs_ganesha = NFSGanesha.init(ctx, fsid, daemon_id) poststop = nfs_ganesha.get_rados_grace_container('remove') - _write_container_cmd_to_bash(f, poststop, 'remove daemon from rados grace') + _write_container_cmd_to_bash(ctx, f, poststop, 'remove daemon from rados grace') elif daemon_type == CephIscsi.daemon_type: # make sure we also stop the tcmu container - ceph_iscsi = CephIscsi.init(fsid, daemon_id) + ceph_iscsi = CephIscsi.init(ctx, fsid, daemon_id) tcmu_container = ceph_iscsi.get_tcmu_runner_container() f.write('! '+ ' '.join(tcmu_container.stop_cmd()) + '\n') f.write(' '.join(CephIscsi.configfs_mount_umount(data_dir, mount=False)) + '\n') @@ -2502,30 +2563,31 @@ def deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c, data_dir + '/unit.image') # systemd - install_base_units(fsid) - unit = get_unit_file(fsid) + install_base_units(ctx, fsid) + unit = get_unit_file(ctx, fsid) unit_file = 'ceph-%s@.service' % (fsid) - with open(args.unit_dir + '/' + unit_file + '.new', 'w') as f: + with open(ctx.args.unit_dir + '/' + unit_file + '.new', 'w') as f: f.write(unit) - os.rename(args.unit_dir + '/' + unit_file + '.new', - args.unit_dir + '/' + unit_file) - call_throws(['systemctl', 'daemon-reload']) + os.rename(ctx.args.unit_dir + '/' + unit_file + '.new', + ctx.args.unit_dir + '/' + unit_file) + call_throws(ctx, ['systemctl', 'daemon-reload']) unit_name = get_unit_name(fsid, daemon_type, daemon_id) - call(['systemctl', 'stop', unit_name], + call(ctx, ['systemctl', 'stop', unit_name], verbosity=CallVerbosity.DEBUG) - call(['systemctl', 'reset-failed', unit_name], + call(ctx, ['systemctl', 'reset-failed', unit_name], verbosity=CallVerbosity.DEBUG) if enable: - call_throws(['systemctl', 'enable', unit_name]) + call_throws(ctx, ['systemctl', 'enable', unit_name]) if start: - call_throws(['systemctl', 'start', unit_name]) + call_throws(ctx, ['systemctl', 'start', unit_name]) class Firewalld(object): - def __init__(self): - # type: () -> None + def __init__(self, ctx): + # type: (CephadmContext) -> None + self.ctx = ctx self.available = self.check() def check(self): @@ -2534,7 +2596,7 @@ class Firewalld(object): if not self.cmd: logger.debug('firewalld does not appear to be present') return False - (enabled, state, _) = check_unit('firewalld.service') + (enabled, state, _) = check_unit(self.ctx, 'firewalld.service') if not enabled: logger.debug('firewalld.service is not enabled') return False @@ -2563,10 +2625,10 @@ class Firewalld(object): if not self.cmd: raise RuntimeError("command not defined") - out, err, ret = call([self.cmd, '--permanent', '--query-service', svc], verbosity=CallVerbosity.DEBUG) + out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--query-service', svc], verbosity=CallVerbosity.DEBUG) if ret: logger.info('Enabling firewalld service %s in current zone...' % svc) - out, err, ret = call([self.cmd, '--permanent', '--add-service', svc]) + out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--add-service', svc]) if ret: raise RuntimeError( 'unable to add service %s to current zone: %s' % (svc, err)) @@ -2584,10 +2646,10 @@ class Firewalld(object): for port in fw_ports: tcp_port = str(port) + '/tcp' - out, err, ret = call([self.cmd, '--permanent', '--query-port', tcp_port], verbosity=CallVerbosity.DEBUG) + out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--query-port', tcp_port], verbosity=CallVerbosity.DEBUG) if ret: logger.info('Enabling firewalld port %s in current zone...' % tcp_port) - out, err, ret = call([self.cmd, '--permanent', '--add-port', tcp_port]) + out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--add-port', tcp_port]) if ret: raise RuntimeError('unable to add port %s to current zone: %s' % (tcp_port, err)) @@ -2605,10 +2667,10 @@ class Firewalld(object): for port in fw_ports: tcp_port = str(port) + '/tcp' - out, err, ret = call([self.cmd, '--permanent', '--query-port', tcp_port], verbosity=CallVerbosity.DEBUG) + out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--query-port', tcp_port], verbosity=CallVerbosity.DEBUG) if not ret: logger.info('Disabling port %s in current zone...' % tcp_port) - out, err, ret = call([self.cmd, '--permanent', '--remove-port', tcp_port]) + out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--remove-port', tcp_port]) if ret: raise RuntimeError('unable to remove port %s from current zone: %s' % (tcp_port, err)) @@ -2625,12 +2687,12 @@ class Firewalld(object): if not self.cmd: raise RuntimeError("command not defined") - call_throws([self.cmd, '--reload']) + call_throws(self.ctx, [self.cmd, '--reload']) -def update_firewalld(daemon_type): - # type: (str) -> None - firewall = Firewalld() +def update_firewalld(ctx, daemon_type): + # type: (CephadmContext, str) -> None + firewall = Firewalld(ctx) firewall.enable_service_for(daemon_type) @@ -2642,34 +2704,34 @@ def update_firewalld(daemon_type): firewall.open_ports(fw_ports) firewall.apply_rules() -def install_base_units(fsid): - # type: (str) -> None +def install_base_units(ctx, fsid): + # type: (CephadmContext, str) -> None """ Set up ceph.target and ceph-$fsid.target units. """ # global unit - existed = os.path.exists(args.unit_dir + '/ceph.target') - with open(args.unit_dir + '/ceph.target.new', 'w') as f: + existed = os.path.exists(ctx.args.unit_dir + '/ceph.target') + with open(ctx.args.unit_dir + '/ceph.target.new', 'w') as f: f.write('[Unit]\n' 'Description=All Ceph clusters and services\n' '\n' '[Install]\n' 'WantedBy=multi-user.target\n') - os.rename(args.unit_dir + '/ceph.target.new', - args.unit_dir + '/ceph.target') + os.rename(ctx.args.unit_dir + '/ceph.target.new', + ctx.args.unit_dir + '/ceph.target') if not existed: # we disable before enable in case a different ceph.target # (from the traditional package) is present; while newer # systemd is smart enough to disable the old # (/lib/systemd/...) and enable the new (/etc/systemd/...), # some older versions of systemd error out with EEXIST. - call_throws(['systemctl', 'disable', 'ceph.target']) - call_throws(['systemctl', 'enable', 'ceph.target']) - call_throws(['systemctl', 'start', 'ceph.target']) + call_throws(ctx, ['systemctl', 'disable', 'ceph.target']) + call_throws(ctx, ['systemctl', 'enable', 'ceph.target']) + call_throws(ctx, ['systemctl', 'start', 'ceph.target']) # cluster unit - existed = os.path.exists(args.unit_dir + '/ceph-%s.target' % fsid) - with open(args.unit_dir + '/ceph-%s.target.new' % fsid, 'w') as f: + existed = os.path.exists(ctx.args.unit_dir + '/ceph-%s.target' % fsid) + with open(ctx.args.unit_dir + '/ceph-%s.target.new' % fsid, 'w') as f: f.write('[Unit]\n' 'Description=Ceph cluster {fsid}\n' 'PartOf=ceph.target\n' @@ -2679,14 +2741,14 @@ def install_base_units(fsid): 'WantedBy=multi-user.target ceph.target\n'.format( fsid=fsid) ) - os.rename(args.unit_dir + '/ceph-%s.target.new' % fsid, - args.unit_dir + '/ceph-%s.target' % fsid) + os.rename(ctx.args.unit_dir + '/ceph-%s.target.new' % fsid, + ctx.args.unit_dir + '/ceph-%s.target' % fsid) if not existed: - call_throws(['systemctl', 'enable', 'ceph-%s.target' % fsid]) - call_throws(['systemctl', 'start', 'ceph-%s.target' % fsid]) + call_throws(ctx, ['systemctl', 'enable', 'ceph-%s.target' % fsid]) + call_throws(ctx, ['systemctl', 'start', 'ceph-%s.target' % fsid]) # logrotate for the cluster - with open(args.logrotate_dir + '/ceph-%s' % fsid, 'w') as f: + with open(ctx.args.logrotate_dir + '/ceph-%s' % fsid, 'w') as f: """ This is a bit sloppy in that the killall/pkill will touch all ceph daemons in all containers, but I don't see an elegant way to send SIGHUP *just* to @@ -2711,10 +2773,10 @@ def install_base_units(fsid): """ % fsid) -def get_unit_file(fsid): - # type: (str) -> str +def get_unit_file(ctx, fsid): + # type: (CephadmContext, str) -> str extra_args = '' - if 'podman' in container_path: + if 'podman' in ctx.container_path: extra_args = ('ExecStartPre=-/bin/rm -f /%t/%n-pid /%t/%n-cid\n' 'ExecStopPost=-/bin/rm -f /%t/%n-pid /%t/%n-cid\n' 'Type=forking\n' @@ -2752,9 +2814,9 @@ StartLimitBurst=5 [Install] WantedBy=ceph-{fsid}.target """.format( - container_path=container_path, + container_path=ctx.container_path, fsid=fsid, - data_dir=args.data_dir, + data_dir=ctx.args.data_dir, extra_args=extra_args) return u @@ -2764,6 +2826,7 @@ WantedBy=ceph-{fsid}.target class CephContainer: def __init__(self, + ctx: CephadmContext, image: str, entrypoint: str, args: List[str] = [], @@ -2777,6 +2840,7 @@ class CephContainer: init: bool = False, host_network: bool = True, ) -> None: + self.ctx = ctx self.image = image self.entrypoint = entrypoint self.args = args @@ -2792,13 +2856,14 @@ class CephContainer: def run_cmd(self) -> List[str]: cmd_args: List[str] = [ - str(container_path), + str(self.ctx.container_path), 'run', '--rm', '--ipc=host', ] - if 'podman' in container_path and os.path.exists('/etc/ceph/podman-auth.json'): + if 'podman' in self.ctx.container_path and \ + os.path.exists('/etc/ceph/podman-auth.json'): cmd_args.append('--authfile=/etc/ceph/podman-auth.json') envs: List[str] = [ @@ -2842,7 +2907,7 @@ class CephContainer: def shell_cmd(self, cmd: List[str]) -> List[str]: cmd_args: List[str] = [ - str(container_path), + str(self.ctx.container_path), 'run', '--rm', '--ipc=host', @@ -2880,7 +2945,7 @@ class CephContainer: def exec_cmd(self, cmd): # type: (List[str]) -> List[str] return [ - str(container_path), + str(self.ctx.container_path), 'exec', ] + self.container_args + [ self.cname, @@ -2889,7 +2954,7 @@ class CephContainer: def rm_cmd(self, storage=False): # type: (bool) -> List[str] ret = [ - str(container_path), + str(self.ctx.container_path), 'rm', '-f', ] if storage: @@ -2900,7 +2965,7 @@ class CephContainer: def stop_cmd(self): # type () -> List[str] ret = [ - str(container_path), + str(self.ctx.container_path), 'stop', self.cname, ] return ret @@ -2908,6 +2973,7 @@ class CephContainer: def run(self, timeout=DEFAULT_TIMEOUT): # type: (Optional[int]) -> str out, _, _ = call_throws( + self.ctx, self.run_cmd(), desc=self.entrypoint, timeout=timeout) return out @@ -2915,9 +2981,9 @@ class CephContainer: @infer_image -def command_version(): - # type: () -> int - out = CephContainer(args.image, 'ceph', ['--version']).run() +def command_version(ctx): + # type: (CephadmContext) -> int + out = CephContainer(ctx, ctx.args.image, 'ceph', ['--version']).run() print(out.strip()) return 0 @@ -2925,15 +2991,15 @@ def command_version(): @infer_image -def command_pull(): - # type: () -> int +def command_pull(ctx): + # type: (CephadmContext) -> int - _pull_image(args.image) - return command_inspect_image() + _pull_image(ctx, ctx.args.image) + return command_inspect_image(ctx) -def _pull_image(image): - # type: (str) -> None +def _pull_image(ctx, image): + # type: (CephadmContext, str) -> None logger.info('Pulling container image %s...' % image) ignorelist = [ @@ -2942,13 +3008,13 @@ def _pull_image(image): "Digest did not match, expected", ] - cmd = [container_path, 'pull', image] - if 'podman' in container_path and os.path.exists('/etc/ceph/podman-auth.json'): + cmd = [ctx.container_path, 'pull', image] + if 'podman' in ctx.container_path and os.path.exists('/etc/ceph/podman-auth.json'): cmd.append('--authfile=/etc/ceph/podman-auth.json') cmd_str = ' '.join(cmd) for sleep_secs in [1, 4, 25]: - out, err, ret = call(cmd) + out, err, ret = call(ctx, cmd) if not ret: return @@ -2963,17 +3029,17 @@ def _pull_image(image): @infer_image -def command_inspect_image(): - # type: () -> int - out, err, ret = call_throws([ - container_path, 'inspect', +def command_inspect_image(ctx): + # type: (CephadmContext) -> int + out, err, ret = call_throws(ctx, [ + ctx.container_path, 'inspect', '--format', '{{.ID}},{{json .RepoDigests}}', - args.image]) + ctx.args.image]) if ret: return errno.ENOENT - info_from = get_image_info_from_inspect(out.strip(), args.image) + info_from = get_image_info_from_inspect(out.strip(), ctx.args.image) - ver = CephContainer(args.image, 'ceph', ['--version']).run().strip() + ver = CephContainer(ctx, ctx.args.image, 'ceph', ['--version']).run().strip() info_from['ceph_version'] = ver print(json.dumps(info_from, indent=4, sort_keys=True)) @@ -3020,8 +3086,8 @@ def wrap_ipv6(address): return address -def is_ipv6(address): - # type: (str) -> bool +def is_ipv6(ctx, address): + # type: (CephadmContext, str) -> bool address = unwrap_ipv6(address) try: return ipaddress.ip_address(unicode(address)).version == 6 @@ -3031,20 +3097,23 @@ def is_ipv6(address): @default_image -def command_bootstrap(): - # type: () -> int +def command_bootstrap(ctx): + # type: (CephadmContext) -> int + + args = ctx.args - if not args.output_config: - args.output_config = os.path.join(args.output_dir, 'ceph.conf') - if not args.output_keyring: - args.output_keyring = os.path.join(args.output_dir, + if not ctx.args.output_config: + ctx.args.output_config = os.path.join(ctx.args.output_dir, 'ceph.conf') + if not ctx.args.output_keyring: + ctx.args.output_keyring = os.path.join(ctx.args.output_dir, 'ceph.client.admin.keyring') - if not args.output_pub_ssh_key: - args.output_pub_ssh_key = os.path.join(args.output_dir, 'ceph.pub') + if not ctx.args.output_pub_ssh_key: + ctx.args.output_pub_ssh_key = os.path.join(ctx.args.output_dir, 'ceph.pub') # verify output files - for f in [args.output_config, args.output_keyring, args.output_pub_ssh_key]: - if not args.allow_overwrite: + for f in [ctx.args.output_config, ctx.args.output_keyring, + ctx.args.output_pub_ssh_key]: + if not ctx.args.allow_overwrite: if os.path.exists(f): raise Error('%s already exists; delete or pass ' '--allow-overwrite to overwrite' % f) @@ -3059,51 +3128,51 @@ def command_bootstrap(): raise Error(f"Unable to create {dirname} due to permissions failure. Retry with root, or sudo or preallocate the directory.") - if not args.skip_prepare_host: - command_prepare_host() + if not ctx.args.skip_prepare_host: + command_prepare_host(ctx) else: logger.info('Skip prepare_host') # initial vars - fsid = args.fsid or make_fsid() + fsid = ctx.args.fsid or make_fsid() hostname = get_hostname() - if '.' in hostname and not args.allow_fqdn_hostname: + if '.' in hostname and not ctx.args.allow_fqdn_hostname: raise Error('hostname is a fully qualified domain name (%s); either fix (e.g., "sudo hostname %s" or similar) or pass --allow-fqdn-hostname' % (hostname, hostname.split('.')[0])) - mon_id = args.mon_id or hostname - mgr_id = args.mgr_id or generate_service_id() + mon_id = ctx.args.mon_id or hostname + mgr_id = ctx.args.mgr_id or generate_service_id() logger.info('Cluster fsid: %s' % fsid) ipv6 = False - l = FileLock(fsid) + l = FileLock(ctx, fsid) l.acquire() # ip r = re.compile(r':(\d+)$') base_ip = '' - if args.mon_ip: - ipv6 = is_ipv6(args.mon_ip) + if ctx.args.mon_ip: + ipv6 = is_ipv6(ctx, ctx.args.mon_ip) if ipv6: - args.mon_ip = wrap_ipv6(args.mon_ip) - hasport = r.findall(args.mon_ip) + ctx.args.mon_ip = wrap_ipv6(ctx.args.mon_ip) + hasport = r.findall(ctx.args.mon_ip) if hasport: port = int(hasport[0]) if port == 6789: - addr_arg = '[v1:%s]' % args.mon_ip + addr_arg = '[v1:%s]' % ctx.args.mon_ip elif port == 3300: - addr_arg = '[v2:%s]' % args.mon_ip + addr_arg = '[v2:%s]' % ctx.args.mon_ip else: logger.warning('Using msgr2 protocol for unrecognized port %d' % port) - addr_arg = '[v2:%s]' % args.mon_ip - base_ip = args.mon_ip[0:-(len(str(port)))-1] - check_ip_port(base_ip, port) + addr_arg = '[v2:%s]' % ctx.args.mon_ip + base_ip = ctx.args.mon_ip[0:-(len(str(port)))-1] + check_ip_port(ctx, base_ip, port) else: - base_ip = args.mon_ip - addr_arg = '[v2:%s:3300,v1:%s:6789]' % (args.mon_ip, args.mon_ip) - check_ip_port(args.mon_ip, 3300) - check_ip_port(args.mon_ip, 6789) - elif args.mon_addrv: - addr_arg = args.mon_addrv + base_ip = ctx.args.mon_ip + addr_arg = '[v2:%s:3300,v1:%s:6789]' % (ctx.args.mon_ip, ctx.args.mon_ip) + check_ip_port(ctx, args.mon_ip, 3300) + check_ip_port(ctx, args.mon_ip, 6789) + elif ctx.args.mon_addrv: + addr_arg = ctx.args.mon_addrv if addr_arg[0] != '[' or addr_arg[-1] != ']': raise Error('--mon-addrv value %s must use square backets' % addr_arg) @@ -3117,16 +3186,16 @@ def command_bootstrap(): # strip off v1: or v2: prefix addr = re.sub(r'^\w+:', '', addr) base_ip = addr[0:-(len(str(port)))-1] - check_ip_port(base_ip, port) + check_ip_port(ctx, base_ip, port) else: raise Error('must specify --mon-ip or --mon-addrv') logger.debug('Base mon IP is %s, final addrv is %s' % (base_ip, addr_arg)) mon_network = None - if not args.skip_mon_network: + if not ctx.args.skip_mon_network: # make sure IP is configured locally, and then figure out the # CIDR network - for net, ips in list_networks().items(): + for net, ips in list_networks(ctx).items(): if ipaddress.ip_address(unicode(unwrap_ipv6(base_ip))) in \ [ipaddress.ip_address(unicode(ip)) for ip in ips]: mon_network = net @@ -3138,39 +3207,42 @@ def command_bootstrap(): '--skip-mon-network to configure it later' % base_ip) # config - cp = read_config(args.config) + cp = read_config(ctx.args.config) if not cp.has_section('global'): cp.add_section('global') cp.set('global', 'fsid', fsid); cp.set('global', 'mon host', addr_arg) - cp.set('global', 'container_image', args.image) + cp.set('global', 'container_image', ctx.args.image) cpf = StringIO() cp.write(cpf) config = cpf.getvalue() - if args.registry_json or args.registry_url: - command_registry_login() + if ctx.args.registry_json or ctx.args.registry_url: + command_registry_login(ctx) - if not args.skip_pull: - _pull_image(args.image) + if not ctx.args.skip_pull: + _pull_image(ctx, ctx.args.image) logger.info('Extracting ceph user uid/gid from container image...') - (uid, gid) = extract_uid_gid() + (uid, gid) = extract_uid_gid(ctx) # create some initial keys logger.info('Creating initial keys...') mon_key = CephContainer( - image=args.image, + ctx, + image=ctx.args.image, entrypoint='/usr/bin/ceph-authtool', args=['--gen-print-key'], ).run().strip() admin_key = CephContainer( - image=args.image, + ctx, + image=ctx.args.image, entrypoint='/usr/bin/ceph-authtool', args=['--gen-print-key'], ).run().strip() mgr_key = CephContainer( - image=args.image, + ctx, + image=ctx.args.image, entrypoint='/usr/bin/ceph-authtool', args=['--gen-print-key'], ).run().strip() @@ -3198,7 +3270,8 @@ def command_bootstrap(): logger.info('Creating initial monmap...') tmp_monmap = write_tmp('', 0, 0) out = CephContainer( - image=args.image, + ctx, + image=ctx.args.image, entrypoint='/usr/bin/monmaptool', args=['--create', '--clobber', @@ -3216,11 +3289,12 @@ def command_bootstrap(): # create mon logger.info('Creating mon...') - create_daemon_dirs(fsid, 'mon', mon_id, uid, gid) - mon_dir = get_data_dir(fsid, 'mon', mon_id) - log_dir = get_log_dir(fsid) + create_daemon_dirs(ctx, fsid, 'mon', mon_id, uid, gid) + mon_dir = get_data_dir(fsid, ctx.args.data_dir, 'mon', mon_id) + log_dir = get_log_dir(fsid, ctx.args.log_dir) out = CephContainer( - image=args.image, + ctx, + image=ctx.args.image, entrypoint='/usr/bin/ceph-mon', args=['--mkfs', '-i', mon_id, @@ -3228,7 +3302,7 @@ def command_bootstrap(): '-c', '/dev/null', '--monmap', '/tmp/monmap', '--keyring', '/tmp/keyring', - ] + get_daemon_args(fsid, 'mon', mon_id), + ] + get_daemon_args(ctx, fsid, 'mon', mon_id), volume_mounts={ log_dir: '/var/log/ceph:z', mon_dir: '/var/lib/ceph/mon/ceph-%s:z' % (mon_id), @@ -3242,9 +3316,9 @@ def command_bootstrap(): os.fchmod(f.fileno(), 0o600) f.write(config) - make_var_run(fsid, uid, gid) - mon_c = get_container(fsid, 'mon', mon_id) - deploy_daemon(fsid, 'mon', mon_id, mon_c, uid, gid, + make_var_run(ctx, fsid, uid, gid) + mon_c = get_container(ctx, fsid, 'mon', mon_id) + deploy_daemon(ctx, fsid, 'mon', mon_id, mon_c, uid, gid, config=None, keyring=None) # client.admin key + config to issue various CLI commands @@ -3265,7 +3339,8 @@ def command_bootstrap(): mounts[k] = v timeout = timeout or args.timeout return CephContainer( - image=args.image, + ctx, + image=ctx.args.image, entrypoint='/usr/bin/ceph', args=cmd, volume_mounts=mounts, @@ -3273,7 +3348,8 @@ def command_bootstrap(): logger.info('Waiting for mon to start...') c = CephContainer( - image=args.image, + ctx, + image=ctx.args.image, entrypoint='/usr/bin/ceph', args=[ 'status'], @@ -3287,15 +3363,15 @@ def command_bootstrap(): # wait for the service to become available def is_mon_available(): # type: () -> bool - timeout=args.timeout if args.timeout else 60 # seconds - out, err, ret = call(c.run_cmd(), + timeout=ctx.args.timeout if ctx.args.timeout else 60 # seconds + out, err, ret = call(ctx, c.run_cmd(), desc=c.entrypoint, timeout=timeout) return ret == 0 - is_available('mon', is_mon_available) + is_available(ctx, 'mon', is_mon_available) # assimilate and minimize config - if not args.no_minimize_config: + if not ctx.args.no_minimize_config: logger.info('Assimilating anything we can from ceph.conf...') cli([ 'config', 'assimilate-conf', @@ -3314,7 +3390,7 @@ def command_bootstrap(): with open(mon_dir + '/config', 'r') as f: config = f.read() logger.info('Restarting the monitor...') - call_throws([ + call_throws(ctx, [ 'systemctl', 'restart', get_unit_name(fsid, 'mon', mon_id) @@ -3331,27 +3407,27 @@ def command_bootstrap(): # create mgr logger.info('Creating mgr...') mgr_keyring = '[mgr.%s]\n\tkey = %s\n' % (mgr_id, mgr_key) - mgr_c = get_container(fsid, 'mgr', mgr_id) + mgr_c = get_container(ctx, fsid, 'mgr', mgr_id) # Note:the default port used by the Prometheus node exporter is opened in fw - deploy_daemon(fsid, 'mgr', mgr_id, mgr_c, uid, gid, + deploy_daemon(ctx, fsid, 'mgr', mgr_id, mgr_c, uid, gid, config=config, keyring=mgr_keyring, ports=[9283]) # output files - with open(args.output_keyring, 'w') as f: + with open(ctx.args.output_keyring, 'w') as f: os.fchmod(f.fileno(), 0o600) f.write('[client.admin]\n' '\tkey = ' + admin_key + '\n') - logger.info('Wrote keyring to %s' % args.output_keyring) + logger.info('Wrote keyring to %s' % ctx.args.output_keyring) - with open(args.output_config, 'w') as f: + with open(ctx.args.output_config, 'w') as f: f.write(config) - logger.info('Wrote config to %s' % args.output_config) + logger.info('Wrote config to %s' % ctx.args.output_config) # wait for the service to become available logger.info('Waiting for mgr to start...') def is_mgr_available(): # type: () -> bool - timeout=args.timeout if args.timeout else 60 # seconds + timeout=ctx.args.timeout if ctx.args.timeout else 60 # seconds try: out = cli(['status', '-f', 'json-pretty'], timeout=timeout) j = json.loads(out) @@ -3359,7 +3435,7 @@ def command_bootstrap(): except Exception as e: logger.debug('status failed: %s' % e) return False - is_available('mgr', is_mgr_available) + is_available(ctx, 'mgr', is_mgr_available) # wait for mgr to restart (after enabling a module) def wait_for_mgr_restart(): @@ -3378,12 +3454,12 @@ def command_bootstrap(): except Exception as e: logger.debug('tell mgr mgr_status failed: %s' % e) return False - is_available('mgr epoch %d' % epoch, mgr_has_latest_epoch) + is_available(ctx, 'mgr epoch %d' % epoch, mgr_has_latest_epoch) # ssh host: Optional[str] = None - if not args.skip_ssh: - cli(['config-key', 'set', 'mgr/cephadm/ssh_user', args.ssh_user]) + if not ctx.args.skip_ssh: + cli(['config-key', 'set', 'mgr/cephadm/ssh_user', ctx.args.ssh_user]) logger.info('Enabling cephadm module...') cli(['mgr', 'module', 'enable', 'cephadm']) @@ -3392,18 +3468,18 @@ def command_bootstrap(): logger.info('Setting orchestrator backend to cephadm...') cli(['orch', 'set', 'backend', 'cephadm']) - if args.ssh_config: + if ctx.args.ssh_config: logger.info('Using provided ssh config...') mounts = { - pathify(args.ssh_config.name): '/tmp/cephadm-ssh-config:z', + pathify(ctx.args.ssh_config.name): '/tmp/cephadm-ssh-config:z', } cli(['cephadm', 'set-ssh-config', '-i', '/tmp/cephadm-ssh-config'], extra_mounts=mounts) - if args.ssh_private_key and args.ssh_public_key: + if ctx.args.ssh_private_key and ctx.args.ssh_public_key: logger.info('Using provided ssh keys...') mounts = { - pathify(args.ssh_private_key.name): '/tmp/cephadm-ssh-key:z', - pathify(args.ssh_public_key.name): '/tmp/cephadm-ssh-key.pub:z' + pathify(ctx.args.ssh_private_key.name): '/tmp/cephadm-ssh-key:z', + pathify(ctx.args.ssh_public_key.name): '/tmp/cephadm-ssh-key.pub:z' } cli(['cephadm', 'set-priv-key', '-i', '/tmp/cephadm-ssh-key'], extra_mounts=mounts) cli(['cephadm', 'set-pub-key', '-i', '/tmp/cephadm-ssh-key.pub'], extra_mounts=mounts) @@ -3412,15 +3488,15 @@ def command_bootstrap(): cli(['cephadm', 'generate-key']) ssh_pub = cli(['cephadm', 'get-pub-key']) - with open(args.output_pub_ssh_key, 'w') as f: + with open(ctx.args.output_pub_ssh_key, 'w') as f: f.write(ssh_pub) - logger.info('Wrote public SSH key to to %s' % args.output_pub_ssh_key) + logger.info('Wrote public SSH key to to %s' % ctx.args.output_pub_ssh_key) - logger.info('Adding key to %s@localhost\'s authorized_keys...' % args.ssh_user) + logger.info('Adding key to %s@localhost\'s authorized_keys...' % ctx.args.ssh_user) try: - s_pwd = pwd.getpwnam(args.ssh_user) + s_pwd = pwd.getpwnam(ctx.args.ssh_user) except KeyError as e: - raise Error('Cannot find uid/gid for ssh-user: %s' % (args.ssh_user)) + raise Error('Cannot find uid/gid for ssh-user: %s' % (ctx.args.ssh_user)) ssh_uid = s_pwd.pw_uid ssh_gid = s_pwd.pw_gid ssh_dir = os.path.join(s_pwd.pw_dir, '.ssh') @@ -3453,29 +3529,29 @@ def command_bootstrap(): except RuntimeError as e: raise Error('Failed to add host <%s>: %s' % (host, e)) - if not args.orphan_initial_daemons: + if not ctx.args.orphan_initial_daemons: for t in ['mon', 'mgr', 'crash']: logger.info('Deploying %s service with default placement...' % t) cli(['orch', 'apply', t]) - if not args.skip_monitoring_stack: + if not ctx.args.skip_monitoring_stack: logger.info('Enabling mgr prometheus module...') cli(['mgr', 'module', 'enable', 'prometheus']) for t in ['prometheus', 'grafana', 'node-exporter', 'alertmanager']: logger.info('Deploying %s service with default placement...' % t) cli(['orch', 'apply', t]) - if args.registry_url and args.registry_username and args.registry_password: - cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_url', args.registry_url, '--force']) - cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_username', args.registry_username, '--force']) - cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_password', args.registry_password, '--force']) + if ctx.args.registry_url and ctx.args.registry_username and ctx.args.registry_password: + cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_url', ctx.args.registry_url, '--force']) + cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_username', ctx.args.registry_username, '--force']) + cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_password', ctx.args.registry_password, '--force']) - if args.container_init: - cli(['config', 'set', 'mgr', 'mgr/cephadm/container_init', str(args.container_init), '--force']) + if ctx.args.container_init: + cli(['config', 'set', 'mgr', 'mgr/cephadm/container_init', str(ctx.args.container_init), '--force']) - if args.with_exporter: + if ctx.args.with_exporter: cli(['config-key', 'set', 'mgr/cephadm/exporter_enabled', 'true']) - if args.exporter_config: + if ctx.args.exporter_config: logger.info("Applying custom cephadm exporter settings") # validated within the parser, so we can just apply to the store with tempfile.NamedTemporaryFile(buffering=0) as tmp: @@ -3495,10 +3571,10 @@ def command_bootstrap(): cli(['orch', 'apply', 'cephadm-exporter']) - if not args.skip_dashboard: + if not ctx.args.skip_dashboard: # Configure SSL port (cephadm only allows to configure dashboard SSL port) # if the user does not want to use SSL he can change this setting once the cluster is up - cli(["config", "set", "mgr", "mgr/dashboard/ssl_server_port" , str(args.ssl_dashboard_port)]) + cli(["config", "set", "mgr", "mgr/dashboard/ssl_server_port" , str(ctx.args.ssl_dashboard_port)]) # configuring dashboard parameters logger.info('Enabling the dashboard module...') @@ -3506,11 +3582,11 @@ def command_bootstrap(): wait_for_mgr_restart() # dashboard crt and key - if args.dashboard_key and args.dashboard_crt: + if ctx.args.dashboard_key and ctx.args.dashboard_crt: logger.info('Using provided dashboard certificate...') mounts = { - pathify(args.dashboard_crt.name): '/tmp/dashboard.crt:z', - pathify(args.dashboard_key.name): '/tmp/dashboard.key:z' + pathify(ctx.args.dashboard_crt.name): '/tmp/dashboard.crt:z', + pathify(ctx.args.dashboard_key.name): '/tmp/dashboard.key:z' } cli(['dashboard', 'set-ssl-certificate', '-i', '/tmp/dashboard.crt'], extra_mounts=mounts) cli(['dashboard', 'set-ssl-certificate-key', '-i', '/tmp/dashboard.key'], extra_mounts=mounts) @@ -3521,8 +3597,8 @@ def command_bootstrap(): logger.info('Creating initial admin user...') password = args.initial_dashboard_password or generate_password() tmp_password_file = write_tmp(password, uid, gid) - cmd = ['dashboard', 'ac-user-create', args.initial_dashboard_user, '-i', '/tmp/dashboard.pw', 'administrator', '--force-password'] - if not args.dashboard_password_noupdate: + cmd = ['dashboard', 'ac-user-create', ctx.args.initial_dashboard_user, '-i', '/tmp/dashboard.pw', 'administrator', '--force-password'] + if not ctx.args.dashboard_password_noupdate: cmd.append('--pwd-update-required') cli(cmd, extra_mounts={pathify(tmp_password_file.name): '/tmp/dashboard.pw:z'}) logger.info('Fetching dashboard port number...') @@ -3530,7 +3606,7 @@ def command_bootstrap(): port = int(out) # Open dashboard port - fw = Firewalld() + fw = Firewalld(ctx) fw.open_ports([port]) fw.apply_rules() @@ -3539,13 +3615,13 @@ def command_bootstrap(): '\t User: %s\n' '\tPassword: %s\n' % ( get_fqdn(), port, - args.initial_dashboard_user, + ctx.args.initial_dashboard_user, password)) - if args.apply_spec: - logger.info('Applying %s to cluster' % args.apply_spec) + if ctx.args.apply_spec: + logger.info('Applying %s to cluster' % ctx.args.apply_spec) - with open(args.apply_spec) as f: + with open(ctx.args.apply_spec) as f: for line in f: if 'hostname:' in line: line = line.replace('\n', '') @@ -3554,12 +3630,12 @@ def command_bootstrap(): logger.info('Adding ssh key to %s' % split[1]) ssh_key = '/etc/ceph/ceph.pub' - if args.ssh_public_key: - ssh_key = args.ssh_public_key.name - out, err, code = call_throws(['ssh-copy-id', '-f', '-i', ssh_key, '%s@%s' % (args.ssh_user, split[1])]) + if ctx.args.ssh_public_key: + ssh_key = ctx.args.ssh_public_key.name + out, err, code = call_throws(ctx, ['ssh-copy-id', '-f', '-i', ssh_key, '%s@%s' % (args.ssh_user, split[1])]) mounts = {} - mounts[pathify(args.apply_spec)] = '/tmp/spec.yml:z' + mounts[pathify(ctx.args.apply_spec)] = '/tmp/spec.yml:z' out = cli(['orch', 'apply', '-i', '/tmp/spec.yml'], extra_mounts=mounts) logger.info(out) @@ -3579,7 +3655,8 @@ def command_bootstrap(): ################################## -def command_registry_login(): +def command_registry_login(ctx: CephadmContext): + args = ctx.args if args.registry_json: logger.info("Pulling custom registry login info from %s." % args.registry_json) d = get_parm(args.registry_json) @@ -3587,7 +3664,7 @@ def command_registry_login(): args.registry_url = d.get('url') args.registry_username = d.get('username') args.registry_password = d.get('password') - registry_login(args.registry_url, args.registry_username, args.registry_password) + registry_login(ctx, args.registry_url, args.registry_username, args.registry_password) else: raise Error("json provided for custom registry login did not include all necessary fields. " "Please setup json file as\n" @@ -3597,52 +3674,54 @@ def command_registry_login(): " \"password\": \"REGISTRY_PASSWORD\"\n" "}\n") elif args.registry_url and args.registry_username and args.registry_password: - registry_login(args.registry_url, args.registry_username, args.registry_password) + registry_login(ctx, args.registry_url, args.registry_username, args.registry_password) else: raise Error("Invalid custom registry arguments received. To login to a custom registry include " "--registry-url, --registry-username and --registry-password " "options or --registry-json option") return 0 -def registry_login(url, username, password): +def registry_login(ctx: CephadmContext, url, username, password): logger.info("Logging into custom registry.") try: + container_path = ctx.container_path cmd = [container_path, 'login', '-u', username, '-p', password, url] if 'podman' in container_path: cmd.append('--authfile=/etc/ceph/podman-auth.json') - out, _, _ = call_throws(cmd) + out, _, _ = call_throws(ctx, cmd) if 'podman' in container_path: os.chmod('/etc/ceph/podman-auth.json', 0o600) except: - raise Error("Failed to login to custom registry @ %s as %s with given password" % (args.registry_url, args.registry_username)) + raise Error("Failed to login to custom registry @ %s as %s with given password" % (ctx.args.registry_url, ctx.args.registry_username)) ################################## -def extract_uid_gid_monitoring(daemon_type): - # type: (str) -> Tuple[int, int] +def extract_uid_gid_monitoring(ctx, daemon_type): + # type: (CephadmContext, str) -> Tuple[int, int] if daemon_type == 'prometheus': - uid, gid = extract_uid_gid(file_path='/etc/prometheus') + uid, gid = extract_uid_gid(ctx, file_path='/etc/prometheus') elif daemon_type == 'node-exporter': uid, gid = 65534, 65534 elif daemon_type == 'grafana': - uid, gid = extract_uid_gid(file_path='/var/lib/grafana') + uid, gid = extract_uid_gid(ctx, file_path='/var/lib/grafana') elif daemon_type == 'alertmanager': - uid, gid = extract_uid_gid(file_path=['/etc/alertmanager', '/etc/prometheus']) + uid, gid = extract_uid_gid(ctx, file_path=['/etc/alertmanager', '/etc/prometheus']) else: raise Error("{} not implemented yet".format(daemon_type)) return uid, gid @default_image -def command_deploy(): - # type: () -> None +def command_deploy(ctx): + # type: (CephadmContext) -> None + args = ctx.args daemon_type, daemon_id = args.name.split('.', 1) - l = FileLock(args.fsid) + l = FileLock(ctx, args.fsid) l.acquire() if daemon_type not in get_supported_daemons(): @@ -3650,7 +3729,7 @@ def command_deploy(): redeploy = False unit_name = get_unit_name(args.fsid, daemon_type, daemon_id) - (_, state, _) = check_unit(unit_name) + (_, state, _) = check_unit(ctx, unit_name) if state == 'running': redeploy = True @@ -3667,13 +3746,13 @@ def command_deploy(): daemon_ports = list(map(int, args.tcp_ports.split())) if daemon_type in Ceph.daemons: - config, keyring = get_config_and_keyring() - uid, gid = extract_uid_gid() - make_var_run(args.fsid, uid, gid) + config, keyring = get_config_and_keyring(ctx) + uid, gid = extract_uid_gid(ctx) + make_var_run(ctx, args.fsid, uid, gid) - c = get_container(args.fsid, daemon_type, daemon_id, + c = get_container(ctx, args.fsid, daemon_type, daemon_id, ptrace=args.allow_ptrace) - deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid, + deploy_daemon(ctx, args.fsid, daemon_type, daemon_id, c, uid, gid, config=config, keyring=keyring, osd_fsid=args.osd_fsid, reconfig=args.reconfig, @@ -3698,9 +3777,9 @@ def command_deploy(): raise Error("{} deployment requires config-json which must " "contain arg for {}".format(daemon_type.capitalize(), ', '.join(required_args))) - uid, gid = extract_uid_gid_monitoring(daemon_type) - c = get_container(args.fsid, daemon_type, daemon_id) - deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid, + uid, gid = extract_uid_gid_monitoring(ctx, daemon_type) + c = get_container(ctx, args.fsid, daemon_type, daemon_id) + deploy_daemon(ctx, args.fsid, daemon_type, daemon_id, c, uid, gid, reconfig=args.reconfig, ports=daemon_ports) @@ -3708,48 +3787,48 @@ def command_deploy(): if not args.reconfig and not redeploy: daemon_ports.extend(NFSGanesha.port_map.values()) - config, keyring = get_config_and_keyring() + config, keyring = get_config_and_keyring(ctx) # TODO: extract ganesha uid/gid (997, 994) ? - uid, gid = extract_uid_gid() - c = get_container(args.fsid, daemon_type, daemon_id) - deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid, + uid, gid = extract_uid_gid(ctx) + c = get_container(ctx, args.fsid, daemon_type, daemon_id) + deploy_daemon(ctx, args.fsid, daemon_type, daemon_id, c, uid, gid, config=config, keyring=keyring, reconfig=args.reconfig, ports=daemon_ports) elif daemon_type == CephIscsi.daemon_type: - config, keyring = get_config_and_keyring() - uid, gid = extract_uid_gid() - c = get_container(args.fsid, daemon_type, daemon_id) - deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid, + config, keyring = get_config_and_keyring(ctx) + uid, gid = extract_uid_gid(ctx) + c = get_container(ctx, args.fsid, daemon_type, daemon_id) + deploy_daemon(ctx, args.fsid, daemon_type, daemon_id, c, uid, gid, config=config, keyring=keyring, reconfig=args.reconfig, ports=daemon_ports) elif daemon_type == HAproxy.daemon_type: - haproxy = HAproxy.init(args.fsid, daemon_id) + haproxy = HAproxy.init(ctx, args.fsid, daemon_id) uid, gid = haproxy.extract_uid_gid_haproxy() - c = get_container(args.fsid, daemon_type, daemon_id) - deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid, + c = get_container(ctx, args.fsid, daemon_type, daemon_id) + deploy_daemon(ctx, args.fsid, daemon_type, daemon_id, c, uid, gid, reconfig=args.reconfig, ports=daemon_ports) elif daemon_type == Keepalived.daemon_type: - keepalived = Keepalived.init(args.fsid, daemon_id) + keepalived = Keepalived.init(ctx, args.fsid, daemon_id) uid, gid = keepalived.extract_uid_gid_keepalived() - c = get_container(args.fsid, daemon_type, daemon_id) - deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid, + c = get_container(ctx, args.fsid, daemon_type, daemon_id) + deploy_daemon(ctx, args.fsid, daemon_type, daemon_id, c, uid, gid, reconfig=args.reconfig, ports=daemon_ports) elif daemon_type == CustomContainer.daemon_type: - cc = CustomContainer.init(args.fsid, daemon_id) + cc = CustomContainer.init(ctx, args.fsid, daemon_id) if not args.reconfig and not redeploy: daemon_ports.extend(cc.ports) - c = get_container(args.fsid, daemon_type, daemon_id, + c = get_container(ctx, args.fsid, daemon_type, daemon_id, privileged=cc.privileged, ptrace=args.allow_ptrace) - deploy_daemon(args.fsid, daemon_type, daemon_id, c, + deploy_daemon(ctx, args.fsid, daemon_type, daemon_id, c, uid=cc.uid, gid=cc.gid, config=None, keyring=None, reconfig=args.reconfig, ports=daemon_ports) @@ -3765,7 +3844,7 @@ def command_deploy(): CephadmDaemon.validate_config(config_js) - deploy_daemon(args.fsid, daemon_type, daemon_id, None, + deploy_daemon(ctx, args.fsid, daemon_type, daemon_id, None, uid, gid, ports=daemon_ports) else: @@ -3776,12 +3855,13 @@ def command_deploy(): @infer_image -def command_run(): - # type: () -> int +def command_run(ctx): + # type: (CephadmContext) -> int + args = ctx.args (daemon_type, daemon_id) = args.name.split('.', 1) - c = get_container(args.fsid, daemon_type, daemon_id) + c = get_container(ctx, args.fsid, daemon_type, daemon_id) command = c.run_cmd() - return call_timeout(command, args.timeout) + return call_timeout(ctx, command, args.timeout) ################################## @@ -3789,10 +3869,11 @@ def command_run(): @infer_fsid @infer_config @infer_image -def command_shell(): - # type: () -> int +def command_shell(ctx): + # type: (CephadmContext) -> int + args = ctx.args if args.fsid: - make_log_dir(args.fsid) + make_log_dir(ctx, args.fsid) if args.name: if '.' in args.name: (daemon_type, daemon_id) = args.name.split('.', 1) @@ -3813,9 +3894,9 @@ def command_shell(): args.keyring = SHELL_DEFAULT_KEYRING container_args = [] # type: List[str] - mounts = get_container_mounts(args.fsid, daemon_type, daemon_id, + mounts = get_container_mounts(ctx, args.fsid, daemon_type, daemon_id, no_config=True if args.config else False) - binds = get_container_binds(args.fsid, daemon_type, daemon_id) + binds = get_container_binds(ctx, args.fsid, daemon_type, daemon_id) if args.config: mounts[pathify(args.config)] = '/etc/ceph/ceph.conf:z' if args.keyring: @@ -3852,6 +3933,7 @@ def command_shell(): mounts[home] = '/root' c = CephContainer( + ctx, image=args.image, entrypoint='doesnotmatter', args=[], @@ -3862,14 +3944,15 @@ def command_shell(): privileged=True) command = c.shell_cmd(command) - return call_timeout(command, args.timeout) + return call_timeout(ctx, command, args.timeout) ################################## @infer_fsid -def command_enter(): - # type: () -> int +def command_enter(ctx): + # type: (CephadmContext) -> int + args = ctx.args if not args.fsid: raise Error('must pass --fsid to specify cluster') (daemon_type, daemon_id) = args.name.split('.', 1) @@ -3884,34 +3967,36 @@ def command_enter(): '-e', "PS1=%s" % CUSTOM_PS1, ] c = CephContainer( + ctx, image=args.image, entrypoint='doesnotmatter', container_args=container_args, cname='ceph-%s-%s.%s' % (args.fsid, daemon_type, daemon_id), ) command = c.exec_cmd(command) - return call_timeout(command, args.timeout) + return call_timeout(ctx, command, args.timeout) ################################## @infer_fsid @infer_image -def command_ceph_volume(): - # type: () -> None +def command_ceph_volume(ctx): + # type: (CephadmContext) -> None + args = ctx.args if args.fsid: - make_log_dir(args.fsid) + make_log_dir(ctx, args.fsid) - l = FileLock(args.fsid) + l = FileLock(ctx, args.fsid) l.acquire() (uid, gid) = (0, 0) # ceph-volume runs as root - mounts = get_container_mounts(args.fsid, 'osd', None) + mounts = get_container_mounts(ctx, args.fsid, 'osd', None) tmp_config = None tmp_keyring = None - (config, keyring) = get_config_and_keyring() + (config, keyring) = get_config_and_keyring(ctx) if config: # tmp config file @@ -3924,6 +4009,7 @@ def command_ceph_volume(): mounts[tmp_keyring.name] = '/var/lib/ceph/bootstrap-osd/ceph.keyring:z' c = CephContainer( + ctx, image=args.image, entrypoint='/usr/sbin/ceph-volume', envs=args.env, @@ -3931,8 +4017,8 @@ def command_ceph_volume(): privileged=True, volume_mounts=mounts, ) - verbosity = CallVerbosity.VERBOSE if args.log_output else CallVerbosity.VERBOSE_ON_FAILURE - out, err, code = call_throws(c.run_cmd(), verbosity=verbosity) + verbosity = CallVerbosity.VERBOSE if ctx.args.log_output else CallVerbosity.VERBOSE_ON_FAILURE + out, err, code = call_throws(ctx, c.run_cmd(), verbosity=verbosity) if not code: print(out) @@ -3940,14 +4026,15 @@ def command_ceph_volume(): @infer_fsid -def command_unit(): - # type: () -> None +def command_unit(ctx): + # type: (CephadmContext) -> None + args = ctx.args if not args.fsid: raise Error('must pass --fsid to specify cluster') - unit_name = get_unit_name_by_daemon_name(args.fsid, args.name) + unit_name = get_unit_name_by_daemon_name(ctx, args.fsid, args.name) - call_throws([ + call_throws(ctx, [ 'systemctl', args.command, unit_name], @@ -3959,12 +4046,13 @@ def command_unit(): @infer_fsid -def command_logs(): - # type: () -> None +def command_logs(ctx): + # type: (CephadmContext) -> None + args = ctx.args if not args.fsid: raise Error('must pass --fsid to specify cluster') - unit_name = get_unit_name_by_daemon_name(args.fsid, args.name) + unit_name = get_unit_name_by_daemon_name(ctx, args.fsid, args.name) cmd = [find_program('journalctl')] cmd.extend(['-u', unit_name]) @@ -3979,8 +4067,8 @@ def command_logs(): ################################## -def list_networks(): - # type: () -> Dict[str,List[str]] +def list_networks(ctx): + # type: (CephadmContext) -> Dict[str,List[str]] ## sadly, 18.04's iproute2 4.15.0-2ubun doesn't support the -j flag, ## so we'll need to use a regex to parse 'ip' command output. @@ -3988,16 +4076,16 @@ def list_networks(): #j = json.loads(out) #for x in j: - res = _list_ipv4_networks() - res.update(_list_ipv6_networks()) + res = _list_ipv4_networks(ctx) + res.update(_list_ipv6_networks(ctx)) return res -def _list_ipv4_networks(): +def _list_ipv4_networks(ctx: CephadmContext): execstr: Optional[str] = find_executable('ip') if not execstr: raise FileNotFoundError("unable to find 'ip' command") - out, _, _ = call_throws([execstr, 'route', 'ls']) + out, _, _ = call_throws(ctx, [execstr, 'route', 'ls']) return _parse_ipv4_route(out) @@ -4016,12 +4104,12 @@ def _parse_ipv4_route(out): return r -def _list_ipv6_networks(): +def _list_ipv6_networks(ctx: CephadmContext): execstr: Optional[str] = find_executable('ip') if not execstr: raise FileNotFoundError("unable to find 'ip' command") - routes, _, _ = call_throws([execstr, '-6', 'route', 'ls']) - ips, _, _ = call_throws([execstr, '-6', 'addr', 'ls']) + routes, _, _ = call_throws(ctx, [execstr, '-6', 'route', 'ls']) + ips, _, _ = call_throws(ctx, [execstr, '-6', 'addr', 'ls']) return _parse_ipv6_route(routes, ips) @@ -4051,26 +4139,28 @@ def _parse_ipv6_route(routes, ips): return r -def command_list_networks(): - # type: () -> None - r = list_networks() +def command_list_networks(ctx): + # type: (CephadmContext) -> None + r = list_networks(ctx) print(json.dumps(r, indent=4)) ################################## -def command_ls(): - # type: () -> None - - ls = list_daemons(detail=not args.no_detail, +def command_ls(ctx): + # type: (CephadmContext) -> None + args = ctx.args + ls = list_daemons(ctx, detail=not args.no_detail, legacy_dir=args.legacy_dir) print(json.dumps(ls, indent=4)) -def list_daemons(detail=True, legacy_dir=None): - # type: (bool, Optional[str]) -> List[Dict[str, str]] +def list_daemons(ctx, detail=True, legacy_dir=None): + # type: (CephadmContext, bool, Optional[str]) -> List[Dict[str, str]] host_version = None ls = [] + args = ctx.args + container_path = ctx.container_path data_dir = args.data_dir if legacy_dir is not None: @@ -4089,6 +4179,7 @@ def list_daemons(detail=True, legacy_dir=None): continue (cluster, daemon_id) = j.split('-', 1) fsid = get_legacy_daemon_fsid( + ctx, cluster, daemon_type, daemon_id, legacy_dir=legacy_dir) legacy_unit_name = 'ceph-%s@%s' % (daemon_type, daemon_id) @@ -4099,10 +4190,11 @@ def list_daemons(detail=True, legacy_dir=None): 'systemd_unit': legacy_unit_name, } if detail: - (val['enabled'], val['state'], _) = check_unit(legacy_unit_name) + (val['enabled'], val['state'], _) = \ + check_unit(ctx, legacy_unit_name) if not host_version: try: - out, err, code = call(['ceph', '-v']) + out, err, code = call(ctx, ['ceph', '-v']) if not code and out.startswith('ceph version '): host_version = out.split(' ')[2] except Exception: @@ -4128,19 +4220,21 @@ def list_daemons(detail=True, legacy_dir=None): } if detail: # get container id - (val['enabled'], val['state'], _) = check_unit(unit_name) + (val['enabled'], val['state'], _) = \ + check_unit(ctx, unit_name) container_id = None image_name = None image_id = None version = None start_stamp = None - if 'podman' in container_path and get_podman_version() < (1, 6, 2): + if 'podman' in container_path and \ + get_podman_version(ctx, container_path) < (1, 6, 2): image_field = '.ImageID' else: image_field = '.Image' - out, err, code = call( + out, err, code = call(ctx, [ container_path, 'inspect', '--format', '{{.Id}},{{.Config.Image}},{{%s}},{{.Created}},{{index .Config.Labels "io.ceph.version"}}' % image_field, @@ -4156,12 +4250,12 @@ def list_daemons(detail=True, legacy_dir=None): if not version or '.' not in version: version = seen_versions.get(image_id, None) if daemon_type == NFSGanesha.daemon_type: - version = NFSGanesha.get_version(container_id) + version = NFSGanesha.get_version(ctx,container_id) if daemon_type == CephIscsi.daemon_type: - version = CephIscsi.get_version(container_id) + version = CephIscsi.get_version(ctx,container_id) elif not version: if daemon_type in Ceph.daemons: - out, err, code = call( + out, err, code = call(ctx, [container_path, 'exec', container_id, 'ceph', '-v']) if not code and \ @@ -4169,7 +4263,7 @@ def list_daemons(detail=True, legacy_dir=None): version = out.split(' ')[2] seen_versions[image_id] = version elif daemon_type == 'grafana': - out, err, code = call( + out, err, code = call(ctx, [container_path, 'exec', container_id, 'grafana-server', '-v']) if not code and \ @@ -4180,7 +4274,7 @@ def list_daemons(detail=True, legacy_dir=None): 'alertmanager', 'node-exporter']: cmd = daemon_type.replace('-', '_') - out, err, code = call( + out, err, code = call(ctx, [container_path, 'exec', container_id, cmd, '--version']) if not code and \ @@ -4188,7 +4282,7 @@ def list_daemons(detail=True, legacy_dir=None): version = err.split(' ')[2] seen_versions[image_id] = version elif daemon_type == 'haproxy': - out, err, code = call( + out, err, code = call(ctx, [container_path, 'exec', container_id, 'haproxy', '-v']) if not code and \ @@ -4196,7 +4290,7 @@ def list_daemons(detail=True, legacy_dir=None): version = out.split(' ')[2] seen_versions[image_id] = version elif daemon_type == 'keepalived': - out, err, code = call( + out, err, code = call(ctx, [container_path, 'exec', container_id, 'keepalived', '--version']) if not code and \ @@ -4235,10 +4329,10 @@ def list_daemons(detail=True, legacy_dir=None): return ls -def get_daemon_description(fsid, name, detail=False, legacy_dir=None): - # type: (str, str, bool, Optional[str]) -> Dict[str, str] +def get_daemon_description(ctx, fsid, name, detail=False, legacy_dir=None): + # type: (CephadmContext, str, str, bool, Optional[str]) -> Dict[str, str] - for d in list_daemons(detail=detail, legacy_dir=legacy_dir): + for d in list_daemons(ctx, detail=detail, legacy_dir=legacy_dir): if d['fsid'] != fsid: continue if d['name'] != name: @@ -4250,11 +4344,12 @@ def get_daemon_description(fsid, name, detail=False, legacy_dir=None): ################################## @default_image -def command_adopt(): - # type: () -> None +def command_adopt(ctx): + # type: (CephadmContext) -> None + args = ctx.args if not args.skip_pull: - _pull_image(args.image) + _pull_image(ctx, args.image) (daemon_type, daemon_id) = args.name.split('.', 1) @@ -4263,33 +4358,35 @@ def command_adopt(): raise Error('adoption of style %s not implemented' % args.style) # lock - fsid = get_legacy_daemon_fsid(args.cluster, + fsid = get_legacy_daemon_fsid(ctx, + args.cluster, daemon_type, daemon_id, legacy_dir=args.legacy_dir) if not fsid: raise Error('could not detect legacy fsid; set fsid in ceph.conf') - l = FileLock(fsid) + l = FileLock(ctx, fsid) l.acquire() # call correct adoption if daemon_type in Ceph.daemons: - command_adopt_ceph(daemon_type, daemon_id, fsid); + command_adopt_ceph(ctx, daemon_type, daemon_id, fsid); elif daemon_type == 'prometheus': - command_adopt_prometheus(daemon_id, fsid) + command_adopt_prometheus(ctx, daemon_id, fsid) elif daemon_type == 'grafana': - command_adopt_grafana(daemon_id, fsid) + command_adopt_grafana(ctx, daemon_id, fsid) elif daemon_type == 'node-exporter': raise Error('adoption of node-exporter not implemented') elif daemon_type == 'alertmanager': - command_adopt_alertmanager(daemon_id, fsid) + command_adopt_alertmanager(ctx, daemon_id, fsid) else: raise Error('daemon type %s not recognized' % daemon_type) class AdoptOsd(object): - def __init__(self, osd_data_dir, osd_id): - # type: (str, str) -> None + def __init__(self, ctx, osd_data_dir, osd_id): + # type: (CephadmContext, str, str) -> None + self.ctx = ctx self.osd_data_dir = osd_data_dir self.osd_id = osd_id @@ -4315,16 +4412,17 @@ class AdoptOsd(object): def check_offline_lvm_osd(self): # type: () -> Tuple[Optional[str], Optional[str]] - + args = self.ctx.args osd_fsid, osd_type = None, None c = CephContainer( + self.ctx, image=args.image, entrypoint='/usr/sbin/ceph-volume', args=['lvm', 'list', '--format=json'], privileged=True ) - out, err, code = call_throws(c.run_cmd()) + out, err, code = call_throws(self.ctx, c.run_cmd()) if not code: try: js = json.loads(out) @@ -4345,7 +4443,6 @@ class AdoptOsd(object): def check_offline_simple_osd(self): # type: () -> Tuple[Optional[str], Optional[str]] - osd_fsid, osd_type = None, None osd_file = glob("/etc/ceph/osd/{}-[a-f0-9-]*.json".format(self.osd_id)) @@ -4359,17 +4456,19 @@ class AdoptOsd(object): if osd_type != "filestore": # need this to be mounted for the adopt to work, as it # needs to move files from this directory - call_throws(['mount', js["data"]["path"], self.osd_data_dir]) + call_throws(self.ctx, ['mount', js["data"]["path"], self.osd_data_dir]) except ValueError as e: logger.info("Invalid JSON in {}: {}".format(osd_file, e)) return osd_fsid, osd_type -def command_adopt_ceph(daemon_type, daemon_id, fsid): - # type: (str, str, str) -> None +def command_adopt_ceph(ctx, daemon_type, daemon_id, fsid): + # type: (CephadmContext, str, str, str) -> None + + args = ctx.args - (uid, gid) = extract_uid_gid() + (uid, gid) = extract_uid_gid(ctx) data_dir_src = ('/var/lib/ceph/%s/%s-%s' % (daemon_type, args.cluster, daemon_id)) @@ -4382,7 +4481,7 @@ def command_adopt_ceph(daemon_type, daemon_id, fsid): osd_fsid = None if daemon_type == 'osd': - adopt_osd = AdoptOsd(data_dir_src, daemon_id) + adopt_osd = AdoptOsd(ctx, data_dir_src, daemon_id) osd_fsid, osd_type = adopt_osd.check_online_osd() if not osd_fsid: osd_fsid, osd_type = adopt_osd.check_offline_lvm_osd() @@ -4399,28 +4498,28 @@ def command_adopt_ceph(daemon_type, daemon_id, fsid): # cluster we are adopting based on the /etc/{defaults,sysconfig}/ceph # CLUSTER field. unit_name = 'ceph-%s@%s' % (daemon_type, daemon_id) - (enabled, state, _) = check_unit(unit_name) + (enabled, state, _) = check_unit(ctx, unit_name) if state == 'running': logger.info('Stopping old systemd unit %s...' % unit_name) - call_throws(['systemctl', 'stop', unit_name]) + call_throws(ctx, ['systemctl', 'stop', unit_name]) if enabled: logger.info('Disabling old systemd unit %s...' % unit_name) - call_throws(['systemctl', 'disable', unit_name]) + call_throws(ctx, ['systemctl', 'disable', unit_name]) # data logger.info('Moving data...') - data_dir_dst = make_data_dir(fsid, daemon_type, daemon_id, + data_dir_dst = make_data_dir(ctx, fsid, daemon_type, daemon_id, uid=uid, gid=gid) - move_files(glob(os.path.join(data_dir_src, '*')), + move_files(ctx, glob(os.path.join(data_dir_src, '*')), data_dir_dst, uid=uid, gid=gid) logger.debug('Remove dir \'%s\'' % (data_dir_src)) if os.path.ismount(data_dir_src): - call_throws(['umount', data_dir_src]) + call_throws(ctx, ['umount', data_dir_src]) os.rmdir(data_dir_src) logger.info('Chowning content...') - call_throws(['chown', '-c', '-R', '%d.%d' % (uid, gid), data_dir_dst]) + call_throws(ctx, ['chown', '-c', '-R', '%d.%d' % (uid, gid), data_dir_dst]) if daemon_type == 'mon': # rename *.ldb -> *.sst, in case they are coming from ubuntu @@ -4451,50 +4550,50 @@ def command_adopt_ceph(daemon_type, daemon_id, fsid): logger.info('Renaming %s -> %s', simple_fn, new_fn) os.rename(simple_fn, new_fn) logger.info('Disabling host unit ceph-volume@ simple unit...') - call(['systemctl', 'disable', + call(ctx, ['systemctl', 'disable', 'ceph-volume@simple-%s-%s.service' % (daemon_id, osd_fsid)]) else: # assume this is an 'lvm' c-v for now, but don't error # out if it's not. logger.info('Disabling host unit ceph-volume@ lvm unit...') - call(['systemctl', 'disable', + call(ctx, ['systemctl', 'disable', 'ceph-volume@lvm-%s-%s.service' % (daemon_id, osd_fsid)]) # config config_src = '/etc/ceph/%s.conf' % (args.cluster) config_src = os.path.abspath(args.legacy_dir + config_src) config_dst = os.path.join(data_dir_dst, 'config') - copy_files([config_src], config_dst, uid=uid, gid=gid) + copy_files(ctx, [config_src], config_dst, uid=uid, gid=gid) # logs logger.info('Moving logs...') log_dir_src = ('/var/log/ceph/%s-%s.%s.log*' % (args.cluster, daemon_type, daemon_id)) log_dir_src = os.path.abspath(args.legacy_dir + log_dir_src) - log_dir_dst = make_log_dir(fsid, uid=uid, gid=gid) - move_files(glob(log_dir_src), + log_dir_dst = make_log_dir(ctx, fsid, uid=uid, gid=gid) + move_files(ctx, glob(log_dir_src), log_dir_dst, uid=uid, gid=gid) logger.info('Creating new units...') - make_var_run(fsid, uid, gid) - c = get_container(fsid, daemon_type, daemon_id) - deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c, + make_var_run(ctx, fsid, uid, gid) + c = get_container(ctx, fsid, daemon_type, daemon_id) + deploy_daemon_units(ctx, fsid, uid, gid, daemon_type, daemon_id, c, enable=True, # unconditionally enable the new unit start=(state == 'running' or args.force_start), osd_fsid=osd_fsid) - update_firewalld(daemon_type) + update_firewalld(ctx, daemon_type) -def command_adopt_prometheus(daemon_id, fsid): - # type: (str, str) -> None - +def command_adopt_prometheus(ctx, daemon_id, fsid): + # type: (CephadmContext, str, str) -> None + args = ctx.args daemon_type = 'prometheus' - (uid, gid) = extract_uid_gid_monitoring(daemon_type) + (uid, gid) = extract_uid_gid_monitoring(ctx, daemon_type) - _stop_and_disable('prometheus') + _stop_and_disable(ctx, 'prometheus') - data_dir_dst = make_data_dir(fsid, daemon_type, daemon_id, + data_dir_dst = make_data_dir(ctx, fsid, daemon_type, daemon_id, uid=uid, gid=gid) # config @@ -4502,29 +4601,31 @@ def command_adopt_prometheus(daemon_id, fsid): config_src = os.path.abspath(args.legacy_dir + config_src) config_dst = os.path.join(data_dir_dst, 'etc/prometheus') makedirs(config_dst, uid, gid, 0o755) - copy_files([config_src], config_dst, uid=uid, gid=gid) + copy_files(ctx, [config_src], config_dst, uid=uid, gid=gid) # data data_src = '/var/lib/prometheus/metrics/' data_src = os.path.abspath(args.legacy_dir + data_src) data_dst = os.path.join(data_dir_dst, 'data') - copy_tree([data_src], data_dst, uid=uid, gid=gid) + copy_tree(ctx, [data_src], data_dst, uid=uid, gid=gid) + + make_var_run(ctx, fsid, uid, gid) + c = get_container(ctx, fsid, daemon_type, daemon_id) + deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid) + update_firewalld(ctx, daemon_type) - make_var_run(fsid, uid, gid) - c = get_container(fsid, daemon_type, daemon_id) - deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid) - update_firewalld(daemon_type) +def command_adopt_grafana(ctx, daemon_id, fsid): + # type: (CephadmContext, str, str) -> None -def command_adopt_grafana(daemon_id, fsid): - # type: (str, str) -> None + args = ctx.args daemon_type = 'grafana' - (uid, gid) = extract_uid_gid_monitoring(daemon_type) + (uid, gid) = extract_uid_gid_monitoring(ctx, daemon_type) - _stop_and_disable('grafana-server') + _stop_and_disable(ctx, 'grafana-server') - data_dir_dst = make_data_dir(fsid, daemon_type, daemon_id, + data_dir_dst = make_data_dir(ctx, fsid, daemon_type, daemon_id, uid=uid, gid=gid) # config @@ -4532,12 +4633,12 @@ def command_adopt_grafana(daemon_id, fsid): config_src = os.path.abspath(args.legacy_dir + config_src) config_dst = os.path.join(data_dir_dst, 'etc/grafana') makedirs(config_dst, uid, gid, 0o755) - copy_files([config_src], config_dst, uid=uid, gid=gid) + copy_files(ctx, [config_src], config_dst, uid=uid, gid=gid) prov_src = '/etc/grafana/provisioning/' prov_src = os.path.abspath(args.legacy_dir + prov_src) prov_dst = os.path.join(data_dir_dst, 'etc/grafana') - copy_tree([prov_src], prov_dst, uid=uid, gid=gid) + copy_tree(ctx, [prov_src], prov_dst, uid=uid, gid=gid) # cert cert = '/etc/grafana/grafana.crt' @@ -4547,12 +4648,12 @@ def command_adopt_grafana(daemon_id, fsid): cert_src = os.path.abspath(args.legacy_dir + cert_src) makedirs(os.path.join(data_dir_dst, 'etc/grafana/certs'), uid, gid, 0o755) cert_dst = os.path.join(data_dir_dst, 'etc/grafana/certs/cert_file') - copy_files([cert_src], cert_dst, uid=uid, gid=gid) + copy_files(ctx, [cert_src], cert_dst, uid=uid, gid=gid) key_src = '/etc/grafana/grafana.key' key_src = os.path.abspath(args.legacy_dir + key_src) key_dst = os.path.join(data_dir_dst, 'etc/grafana/certs/cert_key') - copy_files([key_src], key_dst, uid=uid, gid=gid) + copy_files(ctx, [key_src], key_dst, uid=uid, gid=gid) _adjust_grafana_ini(os.path.join(config_dst, 'grafana.ini')) else: @@ -4562,23 +4663,24 @@ def command_adopt_grafana(daemon_id, fsid): data_src = '/var/lib/grafana/' data_src = os.path.abspath(args.legacy_dir + data_src) data_dst = os.path.join(data_dir_dst, 'data') - copy_tree([data_src], data_dst, uid=uid, gid=gid) + copy_tree(ctx, [data_src], data_dst, uid=uid, gid=gid) - make_var_run(fsid, uid, gid) - c = get_container(fsid, daemon_type, daemon_id) - deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid) - update_firewalld(daemon_type) + make_var_run(ctx, fsid, uid, gid) + c = get_container(ctx, fsid, daemon_type, daemon_id) + deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid) + update_firewalld(ctx, daemon_type) -def command_adopt_alertmanager(daemon_id, fsid): - # type: (str, str) -> None +def command_adopt_alertmanager(ctx, daemon_id, fsid): + # type: (CephadmContext, str, str) -> None + args = ctx.args daemon_type = 'alertmanager' - (uid, gid) = extract_uid_gid_monitoring(daemon_type) + (uid, gid) = extract_uid_gid_monitoring(ctx, daemon_type) - _stop_and_disable('prometheus-alertmanager') + _stop_and_disable(ctx, 'prometheus-alertmanager') - data_dir_dst = make_data_dir(fsid, daemon_type, daemon_id, + data_dir_dst = make_data_dir(ctx, fsid, daemon_type, daemon_id, uid=uid, gid=gid) # config @@ -4586,18 +4688,18 @@ def command_adopt_alertmanager(daemon_id, fsid): config_src = os.path.abspath(args.legacy_dir + config_src) config_dst = os.path.join(data_dir_dst, 'etc/alertmanager') makedirs(config_dst, uid, gid, 0o755) - copy_files([config_src], config_dst, uid=uid, gid=gid) + copy_files(ctx, [config_src], config_dst, uid=uid, gid=gid) # data data_src = '/var/lib/prometheus/alertmanager/' data_src = os.path.abspath(args.legacy_dir + data_src) data_dst = os.path.join(data_dir_dst, 'etc/alertmanager/data') - copy_tree([data_src], data_dst, uid=uid, gid=gid) + copy_tree(ctx, [data_src], data_dst, uid=uid, gid=gid) - make_var_run(fsid, uid, gid) - c = get_container(fsid, daemon_type, daemon_id) - deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid) - update_firewalld(daemon_type) + make_var_run(ctx, fsid, uid, gid) + c = get_container(ctx, fsid, daemon_type, daemon_id) + deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid) + update_firewalld(ctx, daemon_type) def _adjust_grafana_ini(filename): @@ -4626,39 +4728,39 @@ def _adjust_grafana_ini(filename): raise Error("Cannot update {}: {}".format(filename, err)) -def _stop_and_disable(unit_name): - # type: (str) -> None +def _stop_and_disable(ctx, unit_name): + # type: (CephadmContext, str) -> None - (enabled, state, _) = check_unit(unit_name) + (enabled, state, _) = check_unit(ctx, unit_name) if state == 'running': logger.info('Stopping old systemd unit %s...' % unit_name) - call_throws(['systemctl', 'stop', unit_name]) + call_throws(ctx, ['systemctl', 'stop', unit_name]) if enabled: logger.info('Disabling old systemd unit %s...' % unit_name) - call_throws(['systemctl', 'disable', unit_name]) + call_throws(ctx, ['systemctl', 'disable', unit_name]) ################################## -def command_rm_daemon(): - # type: () -> None - - l = FileLock(args.fsid) +def command_rm_daemon(ctx): + # type: (CephadmContext) -> None + args = ctx.args + l = FileLock(ctx, args.fsid) l.acquire() (daemon_type, daemon_id) = args.name.split('.', 1) - unit_name = get_unit_name_by_daemon_name(args.fsid, args.name) + unit_name = get_unit_name_by_daemon_name(ctx, args.fsid, args.name) if daemon_type in ['mon', 'osd'] and not args.force: raise Error('must pass --force to proceed: ' 'this command may destroy precious data!') - call(['systemctl', 'stop', unit_name], + call(ctx, ['systemctl', 'stop', unit_name], verbosity=CallVerbosity.DEBUG) - call(['systemctl', 'reset-failed', unit_name], + call(ctx, ['systemctl', 'reset-failed', unit_name], verbosity=CallVerbosity.DEBUG) - call(['systemctl', 'disable', unit_name], + call(ctx, ['systemctl', 'disable', unit_name], verbosity=CallVerbosity.DEBUG) - data_dir = get_data_dir(args.fsid, daemon_type, daemon_id) + data_dir = get_data_dir(args.fsid, ctx.args.data_dir, daemon_type, daemon_id) if daemon_type in ['mon', 'osd', 'prometheus'] and \ not args.force_delete_data: # rename it out of the way -- do not delete @@ -4671,64 +4773,65 @@ def command_rm_daemon(): os.path.join(backup_dir, dirname)) else: if daemon_type == CephadmDaemon.daemon_type: - CephadmDaemon.uninstall(args.fsid, daemon_type, daemon_id) - call_throws(['rm', '-rf', data_dir]) + CephadmDaemon.uninstall(ctx, args.fsid, daemon_type, daemon_id) + call_throws(ctx, ['rm', '-rf', data_dir]) ################################## -def command_rm_cluster(): - # type: () -> None +def command_rm_cluster(ctx): + # type: (CephadmContext) -> None + args = ctx.args if not args.force: raise Error('must pass --force to proceed: ' 'this command may destroy precious data!') - l = FileLock(args.fsid) + l = FileLock(ctx, args.fsid) l.acquire() # stop + disable individual daemon units - for d in list_daemons(detail=False): + for d in list_daemons(ctx, detail=False): if d['fsid'] != args.fsid: continue if d['style'] != 'cephadm:v1': continue unit_name = get_unit_name(args.fsid, d['name']) - call(['systemctl', 'stop', unit_name], + call(ctx, ['systemctl', 'stop', unit_name], verbosity=CallVerbosity.DEBUG) - call(['systemctl', 'reset-failed', unit_name], + call(ctx, ['systemctl', 'reset-failed', unit_name], verbosity=CallVerbosity.DEBUG) - call(['systemctl', 'disable', unit_name], + call(ctx, ['systemctl', 'disable', unit_name], verbosity=CallVerbosity.DEBUG) # cluster units for unit_name in ['ceph-%s.target' % args.fsid]: - call(['systemctl', 'stop', unit_name], + call(ctx, ['systemctl', 'stop', unit_name], verbosity=CallVerbosity.DEBUG) - call(['systemctl', 'reset-failed', unit_name], + call(ctx, ['systemctl', 'reset-failed', unit_name], verbosity=CallVerbosity.DEBUG) - call(['systemctl', 'disable', unit_name], + call(ctx, ['systemctl', 'disable', unit_name], verbosity=CallVerbosity.DEBUG) slice_name = 'system-%s.slice' % (('ceph-%s' % args.fsid).replace('-', '\\x2d')) - call(['systemctl', 'stop', slice_name], + call(ctx, ['systemctl', 'stop', slice_name], verbosity=CallVerbosity.DEBUG) # rm units - call_throws(['rm', '-f', args.unit_dir + + call_throws(ctx, ['rm', '-f', args.unit_dir + '/ceph-%s@.service' % args.fsid]) - call_throws(['rm', '-f', args.unit_dir + + call_throws(ctx, ['rm', '-f', args.unit_dir + '/ceph-%s.target' % args.fsid]) - call_throws(['rm', '-rf', + call_throws(ctx, ['rm', '-rf', args.unit_dir + '/ceph-%s.target.wants' % args.fsid]) # rm data - call_throws(['rm', '-rf', args.data_dir + '/' + args.fsid]) + call_throws(ctx, ['rm', '-rf', args.data_dir + '/' + args.fsid]) # rm logs - call_throws(['rm', '-rf', args.log_dir + '/' + args.fsid]) - call_throws(['rm', '-rf', args.log_dir + + call_throws(ctx, ['rm', '-rf', args.log_dir + '/' + args.fsid]) + call_throws(ctx, ['rm', '-rf', args.log_dir + '/*.wants/ceph-%s@*' % args.fsid]) # rm logrotate config - call_throws(['rm', '-f', args.logrotate_dir + '/ceph-%s' % args.fsid]) + call_throws(ctx, ['rm', '-f', args.logrotate_dir + '/ceph-%s' % args.fsid]) # clean up config, keyring, and pub key files files = ['/etc/ceph/ceph.conf', '/etc/ceph/ceph.pub', '/etc/ceph/ceph.client.admin.keyring'] @@ -4746,8 +4849,8 @@ def command_rm_cluster(): ################################## -def check_time_sync(enabler=None): - # type: (Optional[Packager]) -> bool +def check_time_sync(ctx, enabler=None): + # type: (CephadmContext, Optional[Packager]) -> bool units = [ 'chrony.service', # 18.04 (at least) 'chronyd.service', # el / opensuse @@ -4756,15 +4859,15 @@ def check_time_sync(enabler=None): 'ntp.service', # 18.04 (at least) 'ntpsec.service', # 20.04 (at least) / buster ] - if not check_units(units, enabler): + if not check_units(ctx, units, enabler): logger.warning('No time sync service is running; checked for %s' % units) return False return True -def command_check_host(): - # type: () -> None - global container_path +def command_check_host(ctx: CephadmContext) -> None: + container_path = ctx.container_path + args = ctx.args errors = [] commands = ['systemctl', 'lvcreate'] @@ -4791,7 +4894,7 @@ def command_check_host(): errors.append('ERROR: %s binary does not appear to be installed' % command) # check for configured+running chronyd or ntp - if not check_time_sync(): + if not check_time_sync(ctx): errors.append('ERROR: No time synchronization is active') if 'expect_hostname' in args and args.expect_hostname: @@ -4809,38 +4912,40 @@ def command_check_host(): ################################## -def command_prepare_host(): - # type: () -> None +def command_prepare_host(ctx: CephadmContext) -> None: + args = ctx.args + container_path = ctx.container_path + logger.info('Verifying podman|docker is present...') pkg = None if not container_path: if not pkg: - pkg = create_packager() + pkg = create_packager(ctx) pkg.install_podman() logger.info('Verifying lvm2 is present...') if not find_executable('lvcreate'): if not pkg: - pkg = create_packager() + pkg = create_packager(ctx) pkg.install(['lvm2']) logger.info('Verifying time synchronization is in place...') - if not check_time_sync(): + if not check_time_sync(ctx): if not pkg: - pkg = create_packager() + pkg = create_packager(ctx) pkg.install(['chrony']) # check again, and this time try to enable # the service - check_time_sync(enabler=pkg) + check_time_sync(ctx, enabler=pkg) if 'expect_hostname' in args and args.expect_hostname and args.expect_hostname != get_hostname(): logger.warning('Adjusting hostname from %s -> %s...' % (get_hostname(), args.expect_hostname)) - call_throws(['hostname', args.expect_hostname]) + call_throws(ctx, ['hostname', args.expect_hostname]) with open('/etc/hostname', 'w') as f: f.write(args.expect_hostname + '\n') logger.info('Repeating the final host check...') - command_check_host() + command_check_host(ctx) ################################## @@ -4901,12 +5006,14 @@ def get_distro(): class Packager(object): - def __init__(self, stable=None, version=None, branch=None, commit=None): + def __init__(self, ctx: CephadmContext, + stable=None, version=None, branch=None, commit=None): assert \ (stable and not version and not branch and not commit) or \ (not stable and version and not branch and not commit) or \ (not stable and not version and branch) or \ (not stable and not version and not branch and not commit) + self.ctx = ctx self.stable = stable self.version = version self.branch = branch @@ -4943,6 +5050,7 @@ class Packager(object): return chacra_response.read().decode('utf-8') def repo_gpgkey(self): + args = self.ctx.args if args.gpg_url: return args.gpg_url if self.stable or self.version: @@ -4954,7 +5062,7 @@ class Packager(object): """ Start and enable the service (typically using systemd). """ - call_throws(['systemctl', 'enable', '--now', service]) + call_throws(self.ctx, ['systemctl', 'enable', '--now', service]) class Apt(Packager): @@ -4963,10 +5071,12 @@ class Apt(Packager): 'debian': 'debian', } - def __init__(self, stable, version, branch, commit, + def __init__(self, ctx: CephadmContext, + stable, version, branch, commit, distro, distro_version, distro_codename): - super(Apt, self).__init__(stable=stable, version=version, + super(Apt, self).__init__(ctx, stable=stable, version=version, branch=branch, commit=commit) + self.ctx = ctx self.distro = self.DISTRO_NAMES[distro] self.distro_codename = distro_codename self.distro_version = distro_version @@ -4975,6 +5085,8 @@ class Apt(Packager): return '/etc/apt/sources.list.d/ceph.list' def add_repo(self): + args = self.ctx.args + url, name = self.repo_gpgkey() logger.info('Installing repo GPG key from %s...' % url) try: @@ -5016,13 +5128,13 @@ class Apt(Packager): def install(self, ls): logger.info('Installing packages %s...' % ls) - call_throws(['apt', 'install', '-y'] + ls) + call_throws(self.ctx, ['apt', 'install', '-y'] + ls) def install_podman(self): if self.distro == 'ubuntu': logger.info('Setting up repo for podman...') self.add_kubic_repo() - call_throws(['apt', 'update']) + call_throws(self.ctx, ['apt', 'update']) logger.info('Attempting podman install...') try: @@ -5056,7 +5168,7 @@ class Apt(Packager): key = response.read().decode('utf-8') tmp_key = write_tmp(key, 0, 0) keyring = self.kubric_repo_gpgkey_path() - call_throws(['apt-key', '--keyring', keyring, 'add', tmp_key.name]) + call_throws(self.ctx, ['apt-key', '--keyring', keyring, 'add', tmp_key.name]) logger.info('Installing repo file at %s...' % self.kubic_repo_path()) content = 'deb %s /\n' % self.kubic_repo_url() @@ -5083,10 +5195,12 @@ class YumDnf(Packager): 'fedora': ('fedora', 'fc'), } - def __init__(self, stable, version, branch, commit, + def __init__(self, ctx: CephadmContext, + stable, version, branch, commit, distro, distro_version): - super(YumDnf, self).__init__(stable=stable, version=version, + super(YumDnf, self).__init__(ctx, stable=stable, version=version, branch=branch, commit=commit) + self.ctx = ctx self.major = int(distro_version.split('.')[0]) self.distro_normalized = self.DISTRO_NAMES[distro][0] self.distro_code = self.DISTRO_NAMES[distro][1] + str(self.major) @@ -5157,6 +5271,7 @@ class YumDnf(Packager): def repo_baseurl(self): assert self.stable or self.version + args = self.ctx.args if self.version: return '%s/rpm-%s/%s' % (args.repo_url, self.version, self.distro_code) @@ -5191,7 +5306,7 @@ class YumDnf(Packager): if self.distro_code.startswith('el'): logger.info('Enabling EPEL...') - call_throws([self.tool, 'install', '-y', 'epel-release']) + call_throws(self.ctx, [self.tool, 'install', '-y', 'epel-release']) def rm_repo(self): if os.path.exists(self.repo_path()): @@ -5199,7 +5314,7 @@ class YumDnf(Packager): def install(self, ls): logger.info('Installing packages %s...' % ls) - call_throws([self.tool, 'install', '-y'] + ls) + call_throws(self.ctx, [self.tool, 'install', '-y'] + ls) def install_podman(self): self.install(['podman']) @@ -5212,10 +5327,12 @@ class Zypper(Packager): 'opensuse-leap' ] - def __init__(self, stable, version, branch, commit, + def __init__(self, ctx: CephadmContext, + stable, version, branch, commit, distro, distro_version): - super(Zypper, self).__init__(stable=stable, version=version, + super(Zypper, self).__init__(ctx, stable=stable, version=version, branch=branch, commit=commit) + self.ctx = ctx self.tool = 'zypper' self.distro = 'opensuse' self.distro_version = '15.1' @@ -5256,6 +5373,7 @@ class Zypper(Packager): def repo_baseurl(self): assert self.stable or self.version + args = self.ctx.args if self.version: return '%s/rpm-%s/%s' % (args.repo_url, self.stable, self.distro) else: @@ -5292,31 +5410,33 @@ class Zypper(Packager): def install(self, ls): logger.info('Installing packages %s...' % ls) - call_throws([self.tool, 'in', '-y'] + ls) + call_throws(self.ctx, [self.tool, 'in', '-y'] + ls) def install_podman(self): self.install(['podman']) -def create_packager(stable=None, version=None, branch=None, commit=None): +def create_packager(ctx: CephadmContext, + stable=None, version=None, branch=None, commit=None): distro, distro_version, distro_codename = get_distro() if distro in YumDnf.DISTRO_NAMES: - return YumDnf(stable=stable, version=version, + return YumDnf(ctx, stable=stable, version=version, branch=branch, commit=commit, distro=distro, distro_version=distro_version) elif distro in Apt.DISTRO_NAMES: - return Apt(stable=stable, version=version, + return Apt(ctx, stable=stable, version=version, branch=branch, commit=commit, distro=distro, distro_version=distro_version, distro_codename=distro_codename) elif distro in Zypper.DISTRO_NAMES: - return Zypper(stable=stable, version=version, + return Zypper(ctx, stable=stable, version=version, branch=branch, commit=commit, distro=distro, distro_version=distro_version) raise Error('Distro %s version %s not supported' % (distro, distro_version)) -def command_add_repo(): +def command_add_repo(ctx: CephadmContext): + args = ctx.args if args.version and args.release: raise Error('you can specify either --release or --version but not both') if not args.version and not args.release and not args.dev and not args.dev_commit: @@ -5327,21 +5447,21 @@ def command_add_repo(): except Exception as e: raise Error('version must be in the form x.y.z (e.g., 15.2.0)') - pkg = create_packager(stable=args.release, + pkg = create_packager(ctx, stable=args.release, version=args.version, branch=args.dev, commit=args.dev_commit) pkg.add_repo() -def command_rm_repo(): - pkg = create_packager() +def command_rm_repo(ctx: CephadmContext): + pkg = create_packager(ctx) pkg.rm_repo() -def command_install(): - pkg = create_packager() - pkg.install(args.packages) +def command_install(ctx: CephadmContext): + pkg = create_packager(ctx) + pkg.install(ctx.args.packages) ################################## @@ -5449,7 +5569,8 @@ class HostFacts(): "0x1af4": "Virtio Block Device" } - def __init__(self): + def __init__(self, ctx: CephadmContext): + self.ctx = ctx self.cpu_model = 'Unknown' self.cpu_count = 0 self.cpu_cores = 0 @@ -5869,7 +5990,7 @@ class HostFacts(): """Get kernel parameters required/used in Ceph clusters""" k_param = {} - out, _, _ = call_throws(['sysctl', '-a'], verbosity=CallVerbosity.SILENT) + out, _, _ = call_throws(self.ctx, ['sysctl', '-a'], verbosity=CallVerbosity.SILENT) if out: param_list = out.split('\n') param_dict = { param.split(" = ")[0]:param.split(" = ")[-1] for param in param_list} @@ -5892,17 +6013,20 @@ class HostFacts(): ################################## -def command_gather_facts(): +def command_gather_facts(ctx: CephadmContext): """gather_facts is intended to provide host releated metadata to the caller""" - host = HostFacts() + host = HostFacts(ctx) print(host.dump()) ################################## -def command_verify_prereqs(): +def command_verify_prereqs(ctx: CephadmContext): + args = ctx.args if args.service_type == 'haproxy' or args.service_type == 'keepalived': - out, err, code = call(['sysctl', '-n', 'net.ipv4.ip_nonlocal_bind']) + out, err, code = call( + ctx, ['sysctl', '-n', 'net.ipv4.ip_nonlocal_bind'] + ) if out.strip() != "1": raise Error('net.ipv4.ip_nonlocal_bind not set to 1') @@ -5976,6 +6100,7 @@ class CephadmDaemonHandler(BaseHTTPRequestHandler): f'/{api_version}/metadata/daemons', f'/{api_version}/metadata/host', ] + class Decorators: @classmethod def authorize(cls, f): @@ -6114,7 +6239,8 @@ class CephadmDaemon(): loop_delay = 1 thread_check_interval = 5 - def __init__(self, fsid, daemon_id=None, port=None): + def __init__(self, ctx: CephadmContext, fsid, daemon_id=None, port=None): + self.ctx = ctx self.fsid = fsid self.daemon_id = daemon_id if not port: @@ -6163,7 +6289,7 @@ class CephadmDaemon(): @property def port_active(self): - return port_in_use(self.port) + return port_in_use(self.ctx, self.port) @property def can_run(self): @@ -6189,7 +6315,7 @@ class CephadmDaemon(): @property def daemon_path(self): return os.path.join( - args.data_dir, + self.ctx.args.data_dir, self.fsid, f'{self.daemon_type}.{self.daemon_id}' ) @@ -6197,7 +6323,7 @@ class CephadmDaemon(): @property def binary_path(self): return os.path.join( - args.data_dir, + self.ctx.args.data_dir, self.fsid, CephadmDaemon.bin_name ) @@ -6233,7 +6359,7 @@ class CephadmDaemon(): s_time = time.time() try: - facts = HostFacts() + facts = HostFacts(self.ctx) except Exception as e: self._handle_thread_exception(e, 'host') exception_encountered = True @@ -6263,6 +6389,7 @@ class CephadmDaemon(): def _scrape_ceph_volume(self, refresh_interval=15): # we're invoking the ceph_volume command, so we need to set the args that it # expects to use + args = self.ctx.args args.command = "inventory --format=json".split() args.fsid = self.fsid args.log_output = False @@ -6282,7 +6409,7 @@ class CephadmDaemon(): stream = io.StringIO() try: with redirect_stdout(stream): - command_ceph_volume() + command_ceph_volume(self.ctx) except Exception as e: self._handle_thread_exception(e, 'disks') exception_encountered = True @@ -6335,7 +6462,7 @@ class CephadmDaemon(): try: # list daemons should ideally be invoked with a fsid - data = list_daemons() + data = list_daemons(self.ctx) except Exception as e: self._handle_thread_exception(e, 'daemons') exception_encountered = True @@ -6488,6 +6615,7 @@ WantedBy=ceph-{fsid}.target daemon since it's not a container, so we just create a simple service definition and add it to the fsid's target """ + args = self.ctx.args if not config: raise Error("Attempting to deploy cephadm daemon without a config") assert isinstance(config, dict) @@ -6513,15 +6641,16 @@ WantedBy=ceph-{fsid}.target os.path.join(args.unit_dir, f"{self.unit_name}.new"), os.path.join(args.unit_dir, self.unit_name)) - call_throws(['systemctl', 'daemon-reload']) - call(['systemctl', 'stop', self.unit_name], + call_throws(self.ctx, ['systemctl', 'daemon-reload']) + call(self.ctx, ['systemctl', 'stop', self.unit_name], verbosity=CallVerbosity.DEBUG) - call(['systemctl', 'reset-failed', self.unit_name], + call(self.ctx, ['systemctl', 'reset-failed', self.unit_name], verbosity=CallVerbosity.DEBUG) - call_throws(['systemctl', 'enable', '--now', self.unit_name]) + call_throws(self.ctx, ['systemctl', 'enable', '--now', self.unit_name]) @classmethod - def uninstall(cls, fsid, daemon_type, daemon_id): + def uninstall(cls, ctx: CephadmContext, fsid, daemon_type, daemon_id): + args = ctx.args unit_name = CephadmDaemon._unit_name(fsid, daemon_id) unit_path = os.path.join(args.unit_dir, unit_name) unit_run = os.path.join(args.data_dir, fsid, f"{daemon_type}.{daemon_id}", "unit.run") @@ -6544,23 +6673,23 @@ WantedBy=ceph-{fsid}.target break if port: - fw = Firewalld() + fw = Firewalld(ctx) try: fw.close_ports([port]) except RuntimeError: logger.error(f"Unable to close port {port}") - stdout, stderr, rc = call(["rm", "-f", unit_path]) + stdout, stderr, rc = call(ctx, ["rm", "-f", unit_path]) if rc: logger.error(f"Unable to remove the systemd file @ {unit_path}") else: logger.info(f"removed systemd unit file @ {unit_path}") - stdout, stderr, rc = call(["systemctl", "daemon-reload"]) + stdout, stderr, rc = call(ctx, ["systemctl", "daemon-reload"]) -def command_exporter(): - - exporter = CephadmDaemon(args.fsid, daemon_id=args.id, port=args.port) +def command_exporter(ctx: CephadmContext): + args = ctx.args + exporter = CephadmDaemon(ctx, args.fsid, daemon_id=args.id, port=args.port) if args.fsid not in os.listdir(args.data_dir): raise Error(f"cluster fsid '{args.fsid}' not found in '{args.data_dir}'") @@ -6582,7 +6711,8 @@ def systemd_target_state(target_name: str, subsystem: str = 'ceph') -> bool: @infer_fsid -def command_maintenance(): +def command_maintenance(ctx: CephadmContext): + args = ctx.args if not args.fsid: raise Error('must pass --fsid to specify cluster') @@ -6591,7 +6721,7 @@ def command_maintenance(): if args.maintenance_action.lower() == 'enter': logger.info("Requested to place host into maintenance") if systemd_target_state(target): - _out, _err, code = call( + _out, _err, code = call(ctx, ['systemctl', 'disable', target], verbosity=CallVerbosity.DEBUG ) @@ -6600,7 +6730,7 @@ def command_maintenance(): return "failed - to disable the target" else: # stopping a target waits by default - _out, _err, code = call( + _out, _err, code = call(ctx, ['systemctl', 'stop', target], verbosity=CallVerbosity.DEBUG ) @@ -6617,7 +6747,7 @@ def command_maintenance(): logger.info("Requested to exit maintenance state") # exit maintenance request if not systemd_target_state(target): - _out, _err, code = call( + _out, _err, code = call(ctx, ['systemctl', 'enable', target], verbosity=CallVerbosity.DEBUG ) @@ -6626,7 +6756,7 @@ def command_maintenance(): return "failed - unable to enable the target" else: # starting a target waits by default - _out, _err, code = call( + _out, _err, code = call(ctx, ['systemctl', 'start', target], verbosity=CallVerbosity.DEBUG ) @@ -7231,7 +7361,9 @@ def _parse_args(av): return args -if __name__ == "__main__": +def main(): + + global logger # root? if os.geteuid() != 0: @@ -7262,6 +7394,8 @@ if __name__ == "__main__": sys.stderr.write('No command specified; pass -h or --help for usage\n') sys.exit(1) + container_path = "" + # podman or docker? if args.func != command_check_host: if args.docker: @@ -7278,8 +7412,12 @@ if __name__ == "__main__": sys.stderr.write('Unable to locate any of %s\n' % CONTAINER_PREFERENCE) sys.exit(1) + ctx = CephadmContext() + ctx.args = args + ctx.container_path = container_path + try: - r = args.func() + r = args.func(ctx) except Error as e: if args.verbose: raise @@ -7288,3 +7426,6 @@ if __name__ == "__main__": if not r: r = 0 sys.exit(r) + +if __name__ == "__main__": + main() \ No newline at end of file -- 2.39.5