From: Sage Weil Date: Tue, 11 Feb 2020 16:01:33 +0000 (-0600) Subject: mgr/orch: service ls -> ps, add DaemonDescription X-Git-Tag: v15.1.1~443^2~2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=aacc9a650f052fd5be543e9265ec94833b8e8bb3;p=ceph-ci.git mgr/orch: service ls -> ps, add DaemonDescription - We keep ServiceDescription around unmodified (although it will need some cleanup later) - We add DaemonDescription, and clean out the service-related ambiguities - Add a new list_daemons() method for Orchestrator - Add a new 'ceph orch ps' command - In cephadm, drop get_services(), and implement list_daemons() - a million changes to make this work - Adjust health alert and option names Signed-off-by: Sage Weil --- diff --git a/doc/mgr/cephadm.rst b/doc/mgr/cephadm.rst index 000cad5bbe7..739d7999680 100644 --- a/doc/mgr/cephadm.rst +++ b/doc/mgr/cephadm.rst @@ -49,7 +49,7 @@ CEPHADM_STRAY_HOST One or more hosts have running Ceph daemons but are not registered as hosts managed by *cephadm*. This means that those services cannot currently be managed by cephadm (e.g., restarted, upgraded, included -in `ceph orch service ls`). +in `ceph orch ps`). You can manage the host(s) with:: @@ -66,20 +66,20 @@ You can also disable this warning entirely with:: ceph config set mgr mgr/cephadm/warn_on_stray_hosts false -CEPHADM_STRAY_SERVICE -^^^^^^^^^^^^^^^^^^^^^ +CEPHADM_STRAY_DAEMON +^^^^^^^^^^^^^^^^^^^^ One or more Ceph daemons are running but not are not managed by *cephadm*, perhaps because they were deploy using a different tool, or were started manually. This means that those services cannot currently be managed by cephadm (e.g., restarted, upgraded, included -in `ceph orch service ls`). +in `ceph orch ps`). **FIXME:** We need to implement and document an adopt procedure here. You can also disable this warning entirely with:: - ceph config set mgr mgr/cephadm/warn_on_stray_services false + ceph config set mgr mgr/cephadm/warn_on_stray_daemons false CEPHADM_HOST_CHECK_FAILED ^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/doc/mgr/orchestrator_cli.rst b/doc/mgr/orchestrator_cli.rst index 4f8dac2b071..5d6d033331e 100644 --- a/doc/mgr/orchestrator_cli.rst +++ b/doc/mgr/orchestrator_cli.rst @@ -242,9 +242,10 @@ services of a particular type via optional --type parameter :: + ceph orch ps ceph orch service ls [--host host] [--svc_type type] [--refresh] -Discover the status of a particular service:: +Discover the status of a particular service or daemons:: ceph orch service ls --svc_type type --svc_id [--refresh] @@ -305,7 +306,6 @@ This is an overview of the current implementation status of the orchestrators. daemon {stop,start,...} ⚪ ✔ device {ident,fault}-(on,off} ⚪ ✔ device ls ✔ ✔ - service ls ✔ ✔ iscsi add ⚪ ⚪ iscsi rm ⚪ ⚪ iscsi update ⚪ ⚪ @@ -315,12 +315,14 @@ This is an overview of the current implementation status of the orchestrators. nfs add ✔ ⚪ nfs rm ✔ ⚪ nfs update ✔ ⚪ + ps ⚪ ✔ rbd-mirror add ⚪ ✔ rbd-mirror rm ⚪ ✔ rbd-mirror update ⚪ ✔ rgw add ✔ ✔ rgw rm ✔ ✔ rgw update ⚪ ✔ + service ls ✔ ⚪ =================================== ====== ========= where diff --git a/qa/tasks/mgr/test_orchestrator_cli.py b/qa/tasks/mgr/test_orchestrator_cli.py index 3473896ab42..dbbe3d199b7 100644 --- a/qa/tasks/mgr/test_orchestrator_cli.py +++ b/qa/tasks/mgr/test_orchestrator_cli.py @@ -61,12 +61,12 @@ class TestOrchestratorCli(MgrTestCase): self.assertIn("localhost", ret) self.assertIsInstance(json.loads(ret), list) - def test_service_ls(self): - ret = self._orch_cmd("service", "ls") + def test_ps(self): + ret = self._orch_cmd("ps") self.assertIn("ceph-mgr", ret) - def test_service_ls_json(self): - ret = self._orch_cmd("service", "ls", "--format", "json") + def test_ps_json(self): + ret = self._orch_cmd("ps", "--format", "json") self.assertIsInstance(json.loads(ret), list) self.assertIn("ceph-mgr", ret) diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index c062bb2ab09..76ce23ec475 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -82,7 +82,7 @@ except ImportError: def _name_to_entity_name(name): """ - Map from service names to ceph entity names (as seen in config) + Map from daemon names to ceph entity names (as seen in config) """ if name.startswith('rgw.') or name.startswith('rbd-mirror'): return 'client.' + name @@ -261,26 +261,26 @@ def trivial_result(val): return AsyncCompletion(value=val, name='trivial_result') -def with_services(service_type=None, - service_name=None, - service_id=None, - node_name=None, - refresh=False): +def with_daemons(daemon_type=None, + daemon_id=None, + service_name=None, + host=None, + refresh=False): def decorator(func): @wraps(func) def wrapper(self, *args, **kwargs): - def on_complete(services): + def on_complete(daemons): if kwargs: - kwargs['services'] = services + kwargs['daemons'] = daemons return func(self, *args, **kwargs) else: - args_ = args + (services,) + args_ = args + (daemons,) return func(self, *args_, **kwargs) - return self._get_services(service_type=service_type, - service_name=service_name, - service_id=service_id, - node_name=node_name, - refresh=refresh).then(on_complete) + return self._get_daemons(daemon_type=daemon_type, + daemon_id=daemon_id, + service_name=service_name, + host=host, + refresh=refresh).then(on_complete) return wrapper return decorator @@ -305,7 +305,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): 'desc': 'seconds to cache device inventory', }, { - 'name': 'service_cache_timeout', + 'name': 'daemon_cache_timeout', 'type': 'secs', 'default': 60, 'desc': 'seconds to cache service (daemon) inventory', @@ -327,14 +327,14 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): 'name': 'warn_on_stray_hosts', 'type': 'bool', 'default': True, - 'desc': 'raise a health warning if services are detected on a host ' + 'desc': 'raise a health warning if daemons are detected on a host ' 'that is not managed by cephadm', }, { - 'name': 'warn_on_stray_services', + 'name': 'warn_on_stray_daemons', 'type': 'bool', 'default': True, - 'desc': 'raise a health warning if services are detected ' + 'desc': 'raise a health warning if daemons are detected ' 'that are not managed by cephadm', }, { @@ -357,11 +357,11 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): if TYPE_CHECKING: self.ssh_config_file = None # type: Optional[str] self.inventory_cache_timeout = 0 - self.service_cache_timeout = 0 + self.daemon_cache_timeout = 0 self.mode = '' self.container_image_base = '' self.warn_on_stray_hosts = True - self.warn_on_stray_services = True + self.warn_on_stray_daemons = True self.warn_on_failed_host_check = True self._cons = {} # type: Dict[str, Tuple[remoto.backends.BaseConnection,remoto.backends.LegacyModuleExecute]] @@ -407,23 +407,23 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): self.inventory_cache = orchestrator.OutdatablePersistentDict( self, self._STORE_HOST_PREFIX + '.devices') - self.service_cache = orchestrator.OutdatablePersistentDict( - self, self._STORE_HOST_PREFIX + '.services') + self.daemon_cache = orchestrator.OutdatablePersistentDict( + self, self._STORE_HOST_PREFIX + '.daemons') # ensure the host lists are in sync for h in self.inventory.keys(): if h not in self.inventory_cache: self.log.debug('adding inventory item for %s' % h) self.inventory_cache[h] = orchestrator.OutdatableData() - if h not in self.service_cache: + if h not in self.daemon_cache: self.log.debug('adding service item for %s' % h) - self.service_cache[h] = orchestrator.OutdatableData() + self.daemon_cache[h] = orchestrator.OutdatableData() for h in self.inventory_cache: if h not in self.inventory: del self.inventory_cache[h] - for h in self.service_cache: + for h in self.daemon_cache: if h not in self.inventory: - del self.service_cache[h] + del self.daemon_cache[h] def shutdown(self): self.log.info('shutdown') @@ -440,25 +440,25 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): # only wait a little bit; the service might go away for something tries = 4 while tries > 0: - if s.service_type not in ['mon', 'osd', 'mds']: + if s.daemon_type not in ['mon', 'osd', 'mds']: break ret, out, err = self.mon_command({ - 'prefix': '%s ok-to-stop' % s.service_type, - 'ids': [s.service_instance], + 'prefix': '%s ok-to-stop' % s.daemon_type, + 'ids': [s.daemon_id], }) if not self.upgrade_state or self.upgrade_state.get('paused'): return False if ret: self.log.info('Upgrade: It is NOT safe to stop %s.%s' % - (s.service_type, s.service_instance)) + (s.daemon_type, s.daemon_id)) time.sleep(15) tries -= 1 else: self.log.info('Upgrade: It is safe to stop %s.%s' % - (s.service_type, s.service_instance)) + (s.daemon_type, s.daemon_id)) return True self.log.info('Upgrade: It is safe to stop %s.%s' % - (s.service_type, s.service_instance)) + (s.daemon_type, s.daemon_id)) return True def _clear_upgrade_health_checks(self): @@ -478,7 +478,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): self.set_health_checks(self.health_checks) def _do_upgrade(self, daemons): - # type: (List[orchestrator.ServiceDescription]) -> Optional[AsyncCompletion] + # type: (List[orchestrator.DaemonDescription]) -> Optional[AsyncCompletion] if not self.upgrade_state: self.log.debug('_do_upgrade no state, exiting') return None @@ -520,22 +520,22 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): self.log.info('Upgrade: Checking %s daemons...' % daemon_type) need_upgrade_self = False for d in daemons: - if d.service_type != daemon_type: + if d.daemon_type != daemon_type: continue if not d.container_image_id: self.log.debug('daemon %s.%s image_id is not known' % ( - daemon_type, d.service_instance)) + daemon_type, d.daemon_id)) return None if d.container_image_id == target_id: self.log.debug('daemon %s.%s version correct' % ( - daemon_type, d.service_instance)) + daemon_type, d.daemon_id)) continue self.log.debug('daemon %s.%s version incorrect (%s, %s)' % ( - daemon_type, d.service_instance, + daemon_type, d.daemon_id, d.container_image_id, d.version)) if daemon_type == 'mgr' and \ - d.service_instance == self.get_mgr_id(): + d.daemon_id == self.get_mgr_id(): self.log.info('Upgrade: Need to upgrade myself (mgr.%s)' % self.get_mgr_id()) need_upgrade_self = True @@ -572,16 +572,16 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): if not self._wait_for_ok_to_stop(d): return None self.log.info('Upgrade: Redeploying %s.%s' % - (d.service_type, d.service_instance)) + (d.daemon_type, d.daemon_id)) ret, out, err = self.mon_command({ 'prefix': 'config set', 'name': 'container_image', 'value': target_name, - 'who': daemon_type + '.' + d.service_instance, + 'who': daemon_type + '.' + d.daemon_id, }) return self._daemon_action([( - d.service_type, - d.service_instance, + d.daemon_type, + d.daemon_id, d.nodename, 'redeploy' )]) @@ -702,33 +702,32 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): } self.set_health_checks(self.health_checks) - def _check_for_strays(self, services): + def _check_for_strays(self, daemons): self.log.debug('_check_for_strays') for k in ['CEPHADM_STRAY_HOST', - 'CEPHADM_STRAY_SERVICE']: + 'CEPHADM_STRAY_DAEMON']: if k in self.health_checks: del self.health_checks[k] - if self.warn_on_stray_hosts or self.warn_on_stray_services: + if self.warn_on_stray_hosts or self.warn_on_stray_daemons: ls = self.list_servers() managed = [] - for s in services: + for s in daemons: managed.append(s.name()) - self.log.debug('cephadm daemons %s' % managed) host_detail = [] # type: List[str] - host_num_services = 0 - service_detail = [] # type: List[str] + host_num_daemons = 0 + daemon_detail = [] # type: List[str] for item in ls: host = item.get('hostname') - services = item.get('services') + daemons = item.get('services') # misnomer! missing_names = [] - for s in services: + for s in daemons: name = '%s.%s' % (s.get('type'), s.get('id')) if host not in self.inventory: missing_names.append(name) - host_num_services += 1 + host_num_daemons += 1 if name not in managed: - service_detail.append( - 'stray service %s on host %s not managed by cephadm' % (name, host)) + daemon_detail.append( + 'stray daemon %s on host %s not managed by cephadm' % (name, host)) if missing_names: host_detail.append( 'stray host %s has %d stray daemons: %s' % ( @@ -736,19 +735,19 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): if host_detail: self.health_checks['CEPHADM_STRAY_HOST'] = { 'severity': 'warning', - 'summary': '%d stray host(s) with %s service(s) ' + 'summary': '%d stray host(s) with %s daemon(s) ' 'not managed by cephadm' % ( - len(host_detail), host_num_services), + len(host_detail), host_num_daemons), 'count': len(host_detail), 'detail': host_detail, } - if service_detail: - self.health_checks['CEPHADM_STRAY_SERVICE'] = { + if daemon_detail: + self.health_checks['CEPHADM_STRAY_DAEMON'] = { 'severity': 'warning', - 'summary': '%d stray service(s) not managed by cephadm' % ( - len(service_detail)), - 'count': len(service_detail), - 'detail': service_detail, + 'summary': '%d stray daemons(s) not managed by cephadm' % ( + len(daemon_detail)), + 'count': len(daemon_detail), + 'detail': daemon_detail, } self.set_health_checks(self.health_checks) @@ -764,15 +763,15 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): while self.run: self._check_hosts() - # refresh services - self.log.debug('refreshing services') - completion = self._get_services(maybe_refresh=True) + # refresh daemons + self.log.debug('refreshing daemons') + completion = self._get_daemons(maybe_refresh=True) self._orchestrator_wait([completion]) # FIXME: this is a band-aid to avoid crashing the mgr, but what # we really need to do here is raise health alerts for individual # hosts that fail and continue with the ones that do not fail. if completion.exception is not None: - self.log.error('failed to refresh services: %s' % completion.exception) + self.log.error('failed to refresh daemons: %s' % completion.exception) self.health_checks['CEPHADM_REFRESH_FAILED'] = { 'severity': 'warning', 'summary': 'failed to probe one or more hosts', @@ -785,13 +784,13 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): if 'CEPHADM_REFRESH_FAILED' in self.health_checks: del self.health_checks['CEPHADM_REFRESH_FAILED'] self.set_health_checks(self.health_checks) - services = completion.result - self.log.debug('services %s' % services) + daemons = completion.result + self.log.debug('daemons %s' % daemons) - self._check_for_strays(services) + self._check_for_strays(daemons) if self.upgrade_state and not self.upgrade_state.get('paused'): - completion = self._do_upgrade(services) + completion = self._do_upgrade(daemons) if completion: while not completion.has_result: self.process([completion]) @@ -827,12 +826,12 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): pass def get_unique_name(self, host, existing, prefix=None, forcename=None): - # type: (str, List[orchestrator.ServiceDescription], Optional[str], Optional[str]) -> str + # type: (str, List[orchestrator.DaemonDescription], Optional[str], Optional[str]) -> str """ Generate a unique random service name """ if forcename: - if len([d for d in existing if d.service_instance == forcename]): + if len([d for d in existing if d.daemon_id == forcename]): raise RuntimeError('specified name %s already in use', forcename) return forcename @@ -845,7 +844,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): name = '' name += host + '.' + ''.join(random.choice(string.ascii_lowercase) for _ in range(6)) - if len([d for d in existing if d.service_instance == name]): + if len([d for d in existing if d.daemon_id == name]): self.log('name %s exists, trying again', name) continue return name @@ -1201,7 +1200,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): } self._save_inventory() self.inventory_cache[spec.hostname] = orchestrator.OutdatableData() - self.service_cache[spec.hostname] = orchestrator.OutdatableData() + self.daemon_cache[spec.hostname] = orchestrator.OutdatableData() self.event.set() # refresh stray health check return "Added host '{}'".format(spec.hostname) @@ -1215,7 +1214,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): del self.inventory[host] self._save_inventory() del self.inventory_cache[host] - del self.service_cache[host] + del self.daemon_cache[host] self._reset_con(host) self.event.set() # refresh stray health check return "Removed host '{}'".format(host) @@ -1276,58 +1275,56 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): return 'Removed label %s from host %s' % (label, host) @async_map_completion - def _refresh_host_services(self, host): + def _refresh_host_daemons(self, host): out, err, code = self._run_cephadm( host, 'mon', 'ls', [], no_fsid=True) data = json.loads(''.join(out)) for d in data: d['last_refresh'] = datetime.datetime.utcnow().strftime(DATEFMT) - self.log.debug('Refreshed host %s services: %s' % (host, data)) - self.service_cache[host] = orchestrator.OutdatableData(data) + self.log.debug('Refreshed host %s daemons: %s' % (host, data)) + self.daemon_cache[host] = orchestrator.OutdatableData(data) return host, data - def _get_services(self, - service_type=None, - service_name=None, - service_id=None, - node_name=None, - refresh=False, - maybe_refresh=False): + def _get_daemons(self, + daemon_type=None, + daemon_id=None, + service_name=None, + host=None, + refresh=False, + maybe_refresh=False): hosts = [] wait_for_args = [] - services = {} + daemons = {} keys = None - if node_name is not None: - keys = [node_name] - for host, host_info in self.service_cache.items_filtered(keys): + if host is not None: + keys = [host] + for host, host_info in self.daemon_cache.items_filtered(keys): hosts.append(host) if refresh: - self.log.info("refreshing services for '{}'".format(host)) + self.log.info("refreshing daemons for '{}'".format(host)) wait_for_args.append((host,)) - elif maybe_refresh and host_info.outdated(self.service_cache_timeout): # type: ignore - self.log.info("refreshing stale services for '{}'".format(host)) + elif maybe_refresh and host_info.outdated(self.daemon_cache_timeout): # type: ignore + self.log.info("refreshing stale daemons for '{}'".format(host)) wait_for_args.append((host,)) elif not host_info.last_refresh: - services[host] = [ + daemons[host] = [ { - 'name': '*', - 'service_type': '*', - 'service_instance': '*', + 'name': '*.*', 'style': 'cephadm:v1', 'fsid': self._cluster_fsid, }, ] else: - self.log.debug('have recent services for %s: %s' % ( + self.log.debug('have recent daemons for %s: %s' % ( host, host_info.data)) - services[host] = host_info.data + daemons[host] = host_info.data - def _get_services_result(results): + def _get_daemons_result(results): for host, data in results: - services[host] = data + daemons[host] = data result = [] - for host, ls in services.items(): + for host, ls in daemons.items(): for d in ls: if not d['style'].startswith('cephadm'): self.log.debug('ignoring non-cephadm on %s: %s' % (host, d)) @@ -1336,20 +1333,20 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): self.log.debug('ignoring foreign daemon on %s: %s' % (host, d)) continue self.log.debug('including %s %s' % (host, d)) - sd = orchestrator.ServiceDescription() + sd = orchestrator.DaemonDescription() if 'last_refresh' in d: sd.last_refresh = datetime.datetime.strptime( d['last_refresh'], DATEFMT) - sd.service_type = d['name'].split('.')[0] - if service_type and service_type != sd.service_type: + if '.' not in d['name']: + self.log.debug('ignoring dot-less daemon on %s: %s' % (host, d)) continue - if '.' in d['name']: - sd.service_instance = '.'.join(d['name'].split('.')[1:]) - elif d['name'] != '*': - sd.service_instance = host - if service_id and service_id != sd.service_instance: + sd.daemon_type = d['name'].split('.')[0] + if daemon_type and daemon_type != sd.daemon_type: continue - if service_name and not sd.service_instance.startswith(service_name + '.'): + sd.daemon_id = '.'.join(d['name'].split('.')[1:]) + if daemon_id and daemon_id != sd.daemon_id: + continue + if service_name and not sd.daemon_id.startswith(service_name + '.'): continue sd.nodename = host sd.container_id = d.get('container_id') @@ -1371,20 +1368,28 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): return result if wait_for_args: - return self._refresh_host_services(wait_for_args).then( - _get_services_result) + return self._refresh_host_daemons(wait_for_args).then( + _get_daemons_result) else: - return trivial_result(_get_services_result({})) - - def describe_service(self, service_type=None, service_id=None, - node_name=None, refresh=False): - if service_type not in ("mds", "osd", "mgr", "mon", 'rgw', "nfs", None): - raise orchestrator.OrchestratorValidationError( - service_type + " unsupported") - result = self._get_services(service_type, - service_id=service_id, - node_name=node_name, - refresh=refresh) + return trivial_result(_get_daemons_result({})) + +# def describe_service(self, service_type=None, service_id=None, +# node_name=None, refresh=False): +# if service_type not in ("mds", "osd", "mgr", "mon", 'rgw', "nfs", None): +# raise orchestrator.OrchestratorValidationError( +# service_type + " unsupported") +# result = self._get_daemons(service_type, +# service_id=service_id, +# node_name=node_name, +# refresh=refresh) +# return result + + def list_daemons(self, daemon_type=None, daemon_id=None, + host=None, refresh=False): + result = self._get_daemons(daemon_type=daemon_type, + daemon_id=daemon_id, + host=host, + refresh=refresh) return result def service_action(self, action, service_type, service_name): @@ -1394,7 +1399,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): def _proc_daemons(daemons): args = [] for d in daemons: - args.append((d.service_type, d.service_instance, + args.append((d.daemon_type, d.daemon_id, d.nodename, action)) if not args: raise orchestrator.OrchestratorError( @@ -1402,7 +1407,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): service_type, service_name)) return self._daemon_action(args) - return self._get_services( + return self._get_daemons( service_type, service_name=service_name).then(_proc_daemons) @@ -1426,7 +1431,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): host, name, 'unit', ['--name', name, a], error_ok=True) - self.service_cache.invalidate(host) + self.daemon_cache.invalidate(host) self.log.debug('_daemon_action code %s out %s' % (code, out)) return "{} {} from host '{}'".format(action, name, host) @@ -1437,7 +1442,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): def _proc_daemons(daemons): args = [] for d in daemons: - args.append((d.service_type, d.service_instance, + args.append((d.daemon_type, d.daemon_id, d.nodename, action)) if not args: raise orchestrator.OrchestratorError( @@ -1445,9 +1450,9 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): daemon_type, daemon_id)) return self._daemon_action(args) - return self._get_services( - service_type=daemon_type, - service_id=daemon_id).then(_proc_daemons) + return self._get_daemons( + daemon_type=daemon_type, + daemon_id=daemon_id).then(_proc_daemons) def get_inventory(self, node_filter=None, refresh=False): """ @@ -1629,11 +1634,11 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): return "Created osd(s) on host '{}'".format(host) - @with_services('osd') - def remove_osds(self, osd_ids, services): - # type: (List[str], List[orchestrator.ServiceDescription]) -> AsyncCompletion - args = [(d.name(), d.nodename) for d in services if - d.service_instance in osd_ids] + @with_daemons('osd') + def remove_osds(self, osd_ids, daemons): + # type: (List[str], List[orchestrator.DaemonDescription]) -> AsyncCompletion + args = [(d.name(), d.nodename) for d in daemons if + d.daemon_id in osd_ids] found = list(zip(*args))[0] if args else [] not_found = {osd_id for osd_id in osd_ids if 'osd.%s' % osd_id not in found} @@ -1705,7 +1710,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): ] + extra_args, stdin=j) self.log.debug('create_daemon code %s out %s' % (code, out)) - if not code and host in self.service_cache: + if not code and host in self.daemon_cache: # prime cached service state with what we (should have) # just created sd = { @@ -1715,14 +1720,14 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): 'enabled': True, 'state': 'running', } - data = self.service_cache[host].data + data = self.daemon_cache[host].data if data: - data = [d for d in data if d['name'] != sd['name']] + data = [d for d in data if '%s.%s' % (daemon_type, daemon_id) != d['name']] data.append(sd) else: data = [sd] - self.service_cache[host] = orchestrator.OutdatableData(data) - self.service_cache.invalidate(host) + self.daemon_cache[host] = orchestrator.OutdatableData(data) + self.daemon_cache.invalidate(host) self.event.set() return "{} {} on host '{}'".format( 'Reconfigured' if reconfig else 'Deployed', name, host) @@ -1736,13 +1741,13 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): host, name, 'rm-daemon', ['--name', name]) self.log.debug('_remove_daemon code %s out %s' % (code, out)) - if not code and host in self.service_cache: + if not code and host in self.daemon_cache: # remove item from cache - data = self.service_cache[host].data + data = self.daemon_cache[host].data if data: data = [d for d in data if d['name'] != name] - self.service_cache[host] = orchestrator.OutdatableData(data) - self.service_cache.invalidate(host) + self.daemon_cache[host] = orchestrator.OutdatableData(data) + self.daemon_cache.invalidate(host) return "Removed {} from host '{}'".format(name, host) def _update_service(self, daemon_type, add_func, spec): @@ -1753,21 +1758,23 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): args = [] for d in daemons[0:to_remove]: args.append( - ('%s.%s' % (d.service_type, d.service_instance), d.nodename) + ('%s.%s' % (d.daemon_type, d.daemon_id), d.nodename) ) return self._remove_daemon(args) elif len(daemons) < spec.count: return add_func(spec) return [] - return self._get_services(daemon_type, service_name=spec.name).then(___update_service) + return self._get_daemons(daemon_type, service_name=spec.name).then(___update_service) - def _add_new_daemon(self, svc_type: str, daemons: List[orchestrator.ServiceDescription], + def _add_new_daemon(self, + daemon_type: str, + daemons: List[orchestrator.DaemonDescription], spec: orchestrator.ServiceSpec, create_func: Callable): args = [] num_added = 0 assert spec.count is not None - prefix = f'{svc_type}.{spec.name}' + prefix = f'{daemon_type}.{spec.name}' our_daemons = [d for d in daemons if d.name().startswith(prefix)] hosts_with_daemons = {d.nodename for d in daemons} hosts_without_daemons = {p for p in spec.placement.hosts if p.hostname not in hosts_with_daemons} @@ -1775,14 +1782,14 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): for host, _, name in hosts_without_daemons: if (len(our_daemons) + num_added) >= spec.count: break - svc_id = self.get_unique_name(host, daemons, spec.name, name) - self.log.debug('placing %s.%s on host %s' % (svc_type, svc_id, host)) - args.append((svc_id, host)) + daemon_id = self.get_unique_name(host, daemons, spec.name, name) + self.log.debug('placing %s.%s on host %s' % (daemon_type, daemon_id, host)) + args.append((daemon_id, host)) # add to daemon list so next name(s) will also be unique - sd = orchestrator.ServiceDescription( + sd = orchestrator.DaemonDescription( nodename=host, - service_type=svc_type, - service_instance=svc_id, + daemon_type=daemon_type, + daemon_id=daemon_id, ) daemons.append(sd) num_added += 1 @@ -1862,7 +1869,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): def update_mons_with_daemons(daemons): for _, _, name in spec.placement.hosts: - if name and len([d for d in daemons if d.service_instance == name]): + if name and len([d for d in daemons if d.daemon_id == name]): raise RuntimeError('name %s alrady exists', name) # explicit placement: enough hosts provided? @@ -1875,7 +1882,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): # TODO: we may want to chain the creation of the monitors so they join # the quorum one at a time. return self._create_mon(spec.placement.hosts) - return self._get_services('mon').then(update_mons_with_daemons) + return self._get_daemons('mon').then(update_mons_with_daemons) @async_map_completion def _create_mgr(self, host, name): @@ -1895,15 +1902,15 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): return self._create_daemon('mgr', name, host, keyring=keyring) - @with_services('mgr') - def update_mgrs(self, spec, services): - # type: (orchestrator.ServiceSpec, List[orchestrator.ServiceDescription]) -> orchestrator.Completion + @with_daemons('mgr') + def update_mgrs(self, spec, daemons): + # type: (orchestrator.ServiceSpec, List[orchestrator.DaemonDescription]) -> orchestrator.Completion """ Adjust the number of cluster managers. """ spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='mgr').load() - num_mgrs = len(services) + num_mgrs = len(daemons) if spec.count == num_mgrs: return orchestrator.Completion(value="The requested number of managers exist.") @@ -1923,9 +1930,9 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): for standby in mgr_map.get('standbys', []): connected.append(standby.get('name', '')) to_remove_damons = [] - for d in services: - if d.service_instance not in connected: - to_remove_damons.append(('%s.%s' % (d.service_type, d.service_instance), + for d in daemons: + if d.daemon_id not in connected: + to_remove_damons.append(('%s.%s' % (d.daemon_type, d.daemon_id), d.nodename)) num_to_remove -= 1 if num_to_remove == 0: @@ -1933,8 +1940,8 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): # otherwise, remove *any* mgr if num_to_remove > 0: - for d in services: - to_remove_damons.append(('%s.%s' % (d.service_type, d.service_instance), d.nodename)) + for d in daemons: + to_remove_damons.append(('%s.%s' % (d.daemon_type, d.daemon_id), d.nodename)) num_to_remove -= 1 if num_to_remove == 0: break @@ -1953,11 +1960,11 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): len(spec.placement.hosts), num_new_mgrs)) for host_spec in spec.placement.hosts: - if host_spec.name and len([d for d in services if d.service_instance == host_spec.name]): + if host_spec.name and len([d for d in daemons if d.daemon_id == host_spec.name]): raise RuntimeError('name %s alrady exists', host_spec.name) for host_spec in spec.placement.hosts: - if host_spec.name and len([d for d in services if d.service_instance == host_spec.name]): + if host_spec.name and len([d for d in daemons if d.daemon_id == host_spec.name]): raise RuntimeError('name %s alrady exists', host_spec.name) self.log.info("creating {} managers on hosts: '{}'".format( @@ -1966,7 +1973,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): args = [] for host_spec in spec.placement.hosts: host = host_spec.hostname - name = host_spec.name or self.get_unique_name(host, services) + name = host_spec.name or self.get_unique_name(host, daemons) args.append((host, name)) c = self._create_mgr(args) c.add_progress('Creating MGRs', self) @@ -1987,10 +1994,10 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): }) def _add_mds(daemons): - # type: (List[orchestrator.ServiceDescription]) -> AsyncCompletion + # type: (List[orchestrator.DaemonDescription]) -> AsyncCompletion return self._add_new_daemon('mds', daemons, spec, self._create_mds) - return self._get_services('mds').then(_add_mds) + return self._get_daemons('mds').then(_add_mds) def update_mds(self, spec): # type: (orchestrator.ServiceSpec) -> AsyncCompletion @@ -2016,14 +2023,14 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): def _remove_mds(daemons): args = [] for d in daemons: - if d.service_instance == name or d.service_instance.startswith(name + '.'): + if d.daemon_id == name or d.daemon_id.startswith(name + '.'): args.append( - ('%s.%s' % (d.service_type, d.service_instance), d.nodename) + ('%s.%s' % (d.daemon_type, d.daemon_id), d.nodename) ) if not args: raise OrchestratorError('Unable to find mds.%s[-*] daemon(s)' % name) return self._remove_daemon(args) - return self._get_services('mds').then(_remove_mds) + return self._get_daemons('mds').then(_remove_mds) def add_rgw(self, spec): if not spec.placement.hosts or len(spec.placement.hosts) < spec.count: @@ -2045,7 +2052,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): def _add_rgw(daemons): return self._add_new_daemon('rgw', daemons, spec, self._create_rgw) - return self._get_services('rgw').then(_add_rgw) + return self._get_daemons('rgw').then(_add_rgw) @async_map_completion def _create_rgw(self, rgw_id, host): @@ -2063,14 +2070,14 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): def _remove_rgw(daemons): args = [] for d in daemons: - if d.service_instance == name or d.service_instance.startswith(name + '.'): - args.append(('%s.%s' % (d.service_type, d.service_instance), + if d.daemon_id == name or d.daemon_id.startswith(name + '.'): + args.append(('%s.%s' % (d.daemon_type, d.daemon_id), d.nodename)) if args: return self._remove_daemon(args) raise RuntimeError('Unable to find rgw.%s[-*] daemon(s)' % name) - return self._get_services('rgw').then(_remove_rgw) + return self._get_daemons('rgw').then(_remove_rgw) def update_rgw(self, spec): spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='rgw').load() @@ -2084,7 +2091,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): def _add_rbd_mirror(daemons): return self._add_new_daemon('rbd-mirror', daemons, spec, self._create_rbd_mirror) - return self._get_services('rbd-mirror').then(_add_rbd_mirror) + return self._get_daemons('rbd-mirror').then(_add_rbd_mirror) @async_map_completion def _create_rbd_mirror(self, daemon_id, host): @@ -2101,16 +2108,16 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): def _remove_rbd_mirror(daemons): args = [] for d in daemons: - if not name or d.service_instance == name: + if not name or d.daemon_id == name: args.append( - ('%s.%s' % (d.service_type, d.service_instance), + ('%s.%s' % (d.daemon_type, d.daemon_id), d.nodename) ) if not args and name: raise RuntimeError('Unable to find rbd-mirror.%s daemon' % name) return self._remove_daemon(args) - return self._get_services('rbd-mirror').then(_remove_rbd_mirror) + return self._get_daemons('rbd-mirror').then(_remove_rbd_mirror) def update_rbd_mirror(self, spec): spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='rbd-mirror').load() @@ -2147,10 +2154,10 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): target_name = image else: raise OrchestratorError('must specify either image or version') - return self._get_services().then( + return self._get_daemons().then( lambda daemons: self._upgrade_check(target_name, daemons)) - def _upgrade_check(self, target_name, services): + def _upgrade_check(self, target_name, daemons): # get service state target_id, target_version = self._get_container_image_id(target_name) self.log.debug('Target image %s id %s version %s' % ( @@ -2162,7 +2169,7 @@ class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin): 'needs_update': dict(), 'up_to_date': list(), } - for s in services: + for s in daemons: if target_id == s.container_image_id: r['up_to_date'].append(s.name()) else: @@ -2304,7 +2311,7 @@ class NodeAssignment(object): self.spec = spec # type: orchestrator.ServiceSpec self.scheduler = scheduler if scheduler else SimpleScheduler(self.spec.placement) self.get_hosts_func = get_hosts_func - self.service_type = service_type + self.daemon_type = service_type def load(self): # type: () -> orchestrator.ServiceSpec @@ -2338,7 +2345,7 @@ class NodeAssignment(object): # host pool (assuming `count` is set) if not self.spec.placement.label and not self.spec.placement.hosts and self.spec.placement.count: logger.info("Found num spec. Looking for labeled nodes.") - # TODO: actually query for labels (self.service_type) + # TODO: actually query for labels (self.daemon_type) candidates = self.scheduler.place([x[0] for x in self.get_hosts_func()], count=self.spec.placement.count) # Not enough nodes to deploy on @@ -2354,7 +2361,7 @@ class NodeAssignment(object): candidates = self.scheduler.place([x[0] for x in self.get_hosts_func()], count=self.spec.placement.count) # Not enough nodes to deploy on if len(candidates) != self.spec.placement.count: - raise OrchestratorValidationError("Cannot place {} services on {} hosts.". + raise OrchestratorValidationError("Cannot place {} daemons on {} hosts.". format(self.spec.placement.count, len(candidates))) logger.info('Assigning nodes to spec: {}'.format(candidates)) diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py index 6e1643c8885..dc237817d4f 100644 --- a/src/pybind/mgr/cephadm/tests/test_cephadm.py +++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py @@ -11,7 +11,7 @@ try: except ImportError: pass -from orchestrator import ServiceDescription, InventoryNode, \ +from orchestrator import ServiceDescription, DaemonDescription, InventoryNode, \ ServiceSpec, PlacementSpec, RGWSpec, HostSpec, OrchestratorError from tests import mock from .fixtures import cephadm_module, wait @@ -50,7 +50,7 @@ class TestCephadm(object): def test_get_unique_name(self, cephadm_module): existing = [ - ServiceDescription(service_instance='mon.a') + DaemonDescription(daemon_type='mon', daemon_id='a') ] new_mon = cephadm_module.get_unique_name('myhost', existing, 'mon') match_glob(new_mon, 'mon.myhost.*') @@ -66,7 +66,7 @@ class TestCephadm(object): @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]')) def test_service_ls(self, cephadm_module): with self._with_host(cephadm_module, 'test'): - c = cephadm_module.describe_service(refresh=True) + c = cephadm_module.list_daemons(refresh=True) assert wait(cephadm_module, c) == [] @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]')) @@ -93,7 +93,7 @@ class TestCephadm(object): def test_daemon_action(self, _send_command, _get_connection, cephadm_module): cephadm_module.service_cache_timeout = 10 with self._with_host(cephadm_module, 'test'): - c = cephadm_module.describe_service(refresh=True) + c = cephadm_module.list_daemons(refresh=True) wait(cephadm_module, c) c = cephadm_module.daemon_action('redeploy', 'rgw', 'myrgw.foobar') assert wait(cephadm_module, c) == ["Deployed rgw.myrgw.foobar on host 'test'"] @@ -149,7 +149,7 @@ class TestCephadm(object): )) def test_remove_osds(self, cephadm_module): with self._with_host(cephadm_module, 'test'): - c = cephadm_module.describe_service(refresh=True) + c = cephadm_module.list_daemons(refresh=True) wait(cephadm_module, c) c = cephadm_module.remove_osds(['0']) out = wait(cephadm_module, c) @@ -235,7 +235,7 @@ class TestCephadm(object): )) def test_remove_rgw(self, cephadm_module): with self._with_host(cephadm_module, 'test'): - c = cephadm_module.describe_service(refresh=True) + c = cephadm_module.list_daemons(refresh=True) wait(cephadm_module, c) c = cephadm_module.remove_rgw('myrgw') out = wait(cephadm_module, c) diff --git a/src/pybind/mgr/orchestrator.py b/src/pybind/mgr/orchestrator.py index 90c6d349566..9f3a6dd2652 100644 --- a/src/pybind/mgr/orchestrator.py +++ b/src/pybind/mgr/orchestrator.py @@ -868,6 +868,16 @@ class Orchestrator(object): """ raise NotImplementedError() + def list_daemons(self, daemon_type=None, daemon_id=None, host=None, refresh=False): + # type: (Optional[str], Optional[str], Optional[str], bool) -> Completion + """ + Describe a daemon (of any kind) that is already configured in + the orchestrator. + + :return: list of DaemonDescription objects. + """ + raise NotImplementedError() + def service_action(self, action, service_type, service_name): # type: (str, str, str) -> Completion """ @@ -1129,6 +1139,86 @@ def handle_type_error(method): return inner +class DaemonDescription(object): + """ + For responding to queries about the status of a particular daemon, + stateful or stateless. + + This is not about health or performance monitoring of daemons: it's + about letting the orchestrator tell Ceph whether and where a + daemon is scheduled in the cluster. When an orchestrator tells + Ceph "it's running on node123", that's not a promise that the process + is literally up this second, it's a description of where the orchestrator + has decided the daemon should run. + """ + + def __init__(self, + daemon_type=None, + daemon_id=None, + nodename=None, + container_id=None, + container_image_id=None, + container_image_name=None, + version=None, + status=None, + status_desc=None): + # Node is at the same granularity as InventoryNode + self.nodename = nodename + + # Not everyone runs in containers, but enough people do to + # justify having the container_id (runtime id) and container_image + # (image name) + self.container_id = container_id # runtime id + self.container_image_id = container_image_id # image hash + self.container_image_name = container_image_name # image friendly name + + # The type of service (osd, mon, mgr, etc.) + self.daemon_type = daemon_type + + # The orchestrator will have picked some names for daemons, + # typically either based on hostnames or on pod names. + # This is the in mds., the ID that will appear + # in the FSMap/ServiceMap. + self.daemon_id = daemon_id + + # Service version that was deployed + self.version = version + + # Service status: -1 error, 0 stopped, 1 running + self.status = status + + # Service status description when status == -1. + self.status_desc = status_desc + + # datetime when this info was last refreshed + self.last_refresh = None # type: Optional[datetime.datetime] + + def name(self): + return '%s.%s' % (self.daemon_type, self.daemon_id) + + def __repr__(self): + return "({type}.{id})".format(type=self.daemon_type, + id=self.daemon_id) + + def to_json(self): + out = { + 'nodename': self.nodename, + 'container_id': self.container_id, + 'container_image_id': self.container_image_id, + 'container_image_name': self.container_image_name, + 'daemon_id': self.daemon_id, + 'daemon_type': self.daemon_type, + 'version': self.version, + 'status': self.status, + 'status_desc': self.status_desc, + } + return {k: v for (k, v) in out.items() if v is not None} + + @classmethod + @handle_type_error + def from_json(cls, data): + return cls(**data) + class ServiceDescription(object): """ For responding to queries about the status of a particular service, diff --git a/src/pybind/mgr/orchestrator_cli/module.py b/src/pybind/mgr/orchestrator_cli/module.py index 4722dd3f9b7..b817c33b9e7 100644 --- a/src/pybind/mgr/orchestrator_cli/module.py +++ b/src/pybind/mgr/orchestrator_cli/module.py @@ -288,31 +288,31 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule): return HandleCommandResult(stdout='\n'.join(out)) @orchestrator._cli_read_command( - 'orch service ls', + 'orch ps', "name=host,type=CephString,req=false " - "name=svc_type,type=CephChoices,strings=mon|mgr|osd|mds|iscsi|nfs|rgw|rbd-mirror,req=false " - "name=svc_id,type=CephString,req=false " + "name=daemon_type,type=CephChoices,strings=mon|mgr|osd|mds|iscsi|nfs|rgw|rbd-mirror,req=false " + "name=daemon_id,type=CephString,req=false " "name=format,type=CephChoices,strings=json|plain,req=false " "name=refresh,type=CephBool,req=false", - 'List services known to orchestrator') - def _list_services(self, host=None, svc_type=None, svc_id=None, format='plain', refresh=False): - # XXX this is kind of confusing for people because in the orchestrator - # context the service ID for MDS is the filesystem ID, not the daemon ID - - completion = self.describe_service(svc_type, svc_id, host, refresh=refresh) + 'List daemons known to orchestrator') + def _list_daemons(self, host=None, daemon_type=None, daemon_id=None, format='plain', refresh=False): + completion = self.list_daemons(daemon_type, + daemon_id=daemon_id, + host=host, + refresh=refresh) self._orchestrator_wait([completion]) orchestrator.raise_if_exception(completion) - services = completion.result + daemons = completion.result def ukn(s): return '' if s is None else s # Sort the list for display - services.sort(key=lambda s: (ukn(s.service_type), ukn(s.nodename), ukn(s.service_instance))) + daemons.sort(key=lambda s: (ukn(s.daemon_type), ukn(s.nodename), ukn(s.daemon_id))) - if len(services) == 0: - return HandleCommandResult(stdout="No services reported") + if len(daemons) == 0: + return HandleCommandResult(stdout="No daemons reported") elif format == 'json': - data = [s.to_json() for s in services] + data = [s.to_json() for s in daemons] return HandleCommandResult(stdout=json.dumps(data)) else: now = datetime.datetime.utcnow() @@ -323,7 +323,7 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule): table.align = 'l' table.left_padding_width = 0 table.right_padding_width = 1 - for s in sorted(services, key=lambda s: s.name()): + for s in sorted(daemons, key=lambda s: s.name()): status = { -1: 'error', 0: 'stopped', @@ -610,9 +610,9 @@ Usage: @orchestrator._cli_write_command( 'orch daemon', "name=action,type=CephChoices,strings=start|stop|restart|redeploy|reconfig " - "name=name,type=CephString ", + "name=name,type=CephString", 'Start, stop, restart, redeploy, or reconfig a specific daemon') - def _service_instance_action(self, action, name): + def _daemon_action(self, action, name): if '.' not in name: raise orchestrator.OrchestratorError('%s is not a valid daemon name' % name) (daemon_type, daemon_id) = name.split('.', 1)