From: Joshua Schmid Date: Wed, 26 Feb 2020 13:26:42 +0000 (+0100) Subject: mgr/cephadm: leverage service specs X-Git-Tag: v15.1.1~191^2~14 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=09ad4d39f40bbb7fb363d54c3bed4582332565d1;p=ceph.git mgr/cephadm: leverage service specs Fixes: https://tracker.ceph.com/issues/44205 This does a couple of things: * Change the way apply_$service() works: Instead of triggering the deployment mechanism it will rather transform the already passed ServiceSpec into a json representation and save it in a persistent mon_store section. `mgr/cephadm/service_spec/$service|daemon_type/service_name` These locations will be periodically checked in the serve() thread. This works since all the apply_$service_type functions are idempotent. * Allow to save a config-like specification in the mon_store. `ceph orch apply -i ` will read the specified services and save them in the mon store section like mentioned above. The same serve() mechanism like above also applies to deployment. Signed-off-by: Joshua Schmid --- diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index ea0a34fa1ff0..551dac3764c1 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -2,6 +2,7 @@ import json import errno import logging import time +import yaml from threading import Event from functools import wraps @@ -9,7 +10,7 @@ from mgr_util import create_self_signed_cert import string try: - from typing import List, Dict, Optional, Callable, Tuple, TypeVar, Type, Any, NamedTuple + from typing import List, Dict, Optional, Callable, Tuple, TypeVar, Type, Any, NamedTuple, Iterator from typing import TYPE_CHECKING except ImportError: TYPE_CHECKING = False # just for type checking @@ -25,16 +26,16 @@ import re import shutil import subprocess import uuid +from mgr_module import PersistentStoreDict from ceph.deployment import inventory, translate from ceph.deployment.drive_group import DriveGroupSpec from ceph.deployment.drive_selection import selector from mgr_module import MgrModule -import mgr_util import orchestrator from orchestrator import OrchestratorError, HostPlacementSpec, OrchestratorValidationError, HostSpec, \ - CLICommandMeta + CLICommandMeta, ServiceSpec from . import remotes from ._utils import RemoveUtil @@ -585,6 +586,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): self.osd_removal_report: dict = dict() self.rm_util = RemoveUtil(self) + self.service_spec_store = PersistentStoreDict(self, 'service_spec') + # ensure the host lists are in sync for h in self.inventory.keys(): if h not in self.cache.daemons: @@ -952,6 +955,18 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): while self.run: self._check_hosts() self._remove_osds_bg() + completions = self._apply_services() + for completion in completions: + if completion: + while not completion.has_result: + self.process([completion]) + self.log.debug(f'Still processing {completion}') + if completion.needs_result: + time.sleep(1) + else: + break + if completion.exception is not None: + self.log.error(str(completion.exception)) # refresh daemons self.log.debug('refreshing hosts') @@ -1579,20 +1594,31 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): # ugly sync path, FIXME someday perhaps? for host, hi in self.inventory.items(): self._refresh_host_daemons(host) + # sm = {} # type: Dict[str, orchestrator.ServiceDescription] for h, dm in self.cache.daemons.items(): for name, dd in dm.items(): if service_type and service_type != dd.daemon_type: continue - n = dd.service_name() + # i.e. rgw.realm.zone + n: str = dd.service_name() if service_name and service_name != n: continue + service_store = PersistentStoreDict(self, f'service_spec/{dd.daemon_type}') + try: + _ = service_store[dd.service_name(without_service_type=True)] + spec_presence = "present" + except IndexError: + spec_presence = "absent" + if dd.daemon_type == 'osd': + spec_presence = "not applicable" if n not in sm: sm[n] = orchestrator.ServiceDescription( service_name=n, last_refresh=dd.last_refresh, container_image_id=dd.container_image_id, container_image_name=dd.container_image_name, + spec_presence=spec_presence, ) sm[n].size += 1 if dd.status == 1: @@ -1684,6 +1710,11 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): for name in names: if name in dm: args.append((name, host, force)) + # TODO: bail out for OSDs when https://github.com/ceph/ceph/pull/32983 is merged + if dm[name].daemon_type == 'osd': + continue + self._remove_key_from_store(dm[name].daemon_type, + dm[name].service_name(without_service_type=True)) if not args: raise OrchestratorError('Unable to find daemon(s) %s' % (names)) self.log.info('Remove daemons %s' % [a[0] for a in args]) @@ -1697,6 +1728,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): args.append( (d.name(), d.hostname, force) ) + self._remove_key_from_store(d.daemon_type, d.service_name(without_service_type=True)) if not args: raise OrchestratorError('Unable to find daemons in %s service' % ( service_name)) @@ -1704,6 +1736,14 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): service_name, [a[0] for a in args])) return self._remove_daemon(args) + def _remove_key_from_store(self, service_type: str, name: str) -> None: + self.log.debug(f"Removing {name} from the service_spec store") + try: + store = PersistentStoreDict(self, f'service_spec/{service_type}') + del store[name] + except IndexError: + self.log.debug(f"{service_type}/{name} not found in store. Can't remove.") + def get_inventory(self, host_filter=None, refresh=False): """ Return the storage inventory of hosts matching the given filter. @@ -2123,6 +2163,11 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): return self._add_daemon('mgr', spec, self._create_mgr) def apply_mgr(self, spec): + self.save_spec(spec) + self._kick_serve_loop() + return trivial_result("scheduled MGR creation..") + + def _apply_mgr(self, spec): # type: (orchestrator.ServiceSpec) -> AsyncCompletion return self._apply_service('mgr', spec, self._create_mgr) @@ -2130,7 +2175,12 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): # type: (orchestrator.ServiceSpec) -> AsyncCompletion return self._add_daemon('mds', spec, self._create_mds, self._config_mds) - def apply_mds(self, spec): + def apply_mds(self, spec: orchestrator.ServiceSpec) -> orchestrator.Completion: + self.save_spec(spec) + self._kick_serve_loop() + return trivial_result("scheduled MDS creation..") + + def _apply_mds(self, spec): # type: (orchestrator.ServiceSpec) -> AsyncCompletion return self._apply_service('mds', spec, self._create_mds, self._config_mds) @@ -2187,6 +2237,11 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): return self._create_daemon('rgw', rgw_id, host, keyring=keyring) def apply_rgw(self, spec): + self.save_spec(spec) + self._kick_serve_loop() + return trivial_result("scheduled RGW creation..") + + def _apply_rgw(self, spec): # type: (orchestrator.ServiceSpec) -> AsyncCompletion return self._apply_service('rgw', spec, self._create_rgw, self._config_rgw) @@ -2206,6 +2261,11 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): keyring=keyring) def apply_rbd_mirror(self, spec): + self.save_spec(spec) + self._kick_serve_loop() + return trivial_result("scheduled rbd-mirror creation..") + + def _apply_rbd_mirror(self, spec): # type: (orchestrator.ServiceSpec) -> AsyncCompletion return self._apply_service('rbd-mirror', spec, self._create_rbd_mirror) @@ -2405,16 +2465,26 @@ receivers: def _create_prometheus(self, daemon_id, host): return self._create_daemon('prometheus', daemon_id, host) - def apply_prometheus(self, spec): + def _apply_prometheus(self, spec): # type: (orchestrator.ServiceSpec) -> AsyncCompletion return self._apply_service('prometheus', spec, self._create_prometheus) + def apply_prometheus(self, spec): + self.save_spec(spec) + self._kick_serve_loop() + return trivial_result("scheduled prometheus creation..") + def add_node_exporter(self, spec): # type: (orchestrator.ServiceSpec) -> AsyncCompletion return self._add_daemon('node-exporter', spec, self._create_node_exporter) def apply_node_exporter(self, spec): + self.save_spec(spec) + self._kick_serve_loop() + return trivial_result("scheduled node-exporter creation..") + + def _apply_node_exporter(self, spec): # type: (orchestrator.ServiceSpec) -> AsyncCompletion return self._apply_service('node-exporter', spec, self._create_node_exporter) @@ -2676,6 +2746,136 @@ receivers: """ return trivial_result(self.osd_removal_report) + def save_spec(self, spec: ServiceSpec) -> None: + """ + Attempts to save a ServiceSpec. + + There are two ways to manipulate the service_specs stored in the mon_store + 1) Using a global definition (i.e. `ceph orch apply -i `) + This will usually contain multiple definitions of services and daemons. + + 2) Using the CLI for specific service types (i.e. `ceph orch apply rgw 3 --realm foo --zone bar`) + + Raises `OrchestratorError` if an entry with the same named identifier already exist to prevent overwrites from two different sources. + """ + store = PersistentStoreDict(self, f'service_spec/{spec.service_type}') + name: str = spec.name or spec.service_type + try: + _ = store[name] + raise orchestrator.OrchestratorError(f"Specification for {name} already exists. " + "Please review your existing specs with " + "`ceph orch servicespecs ls` and try again.") + except IndexError: + self.log.info(f"New spec found. Saving <{name}> to the store.") + store[name] = spec.to_json() + + def list_specs(self) -> orchestrator.Completion: + """ + Loads all entries from the service_spec mon_store root. + """ + specs = list() + for _, spec in self.service_spec_store.items(): + specs.append('---') + specs.append(yaml.dump(spec)) + return trivial_result(specs) + + def clear_all_specs(self) -> orchestrator.Completion: + """ + Clears the service_spec root in the mon_store (mgr/cephadm/service_specs/) + """ + self.service_spec_store.clear() + return trivial_result(f"Mon store for {self.service_spec_store.prefix} cleared") + + def apply_service_config(self, spec_document: str) -> orchestrator.Completion: + """ + Parse a multi document yaml file (represented in a inbuf object) + and loads it with it's respective ServiceSpec to validate the + initial input. + If no errors are raised `save_spec` is called. + """ + content: Iterator[Any] = yaml.load_all(spec_document) + # Load all specs from a multi document yaml file. + for spec in content: + # load ServiceSpec once to validate + spec_o = ServiceSpec.from_json(spec) + self.save_spec(spec_o) + self._kick_serve_loop() + return trivial_result("ServiceSpecs saved") + + def trigger_deployment(self, service_name: str, + func: Callable[[ServiceSpec], orchestrator.Completion]) -> List[orchestrator.Completion]: + """ + Triggers a corresponding deployment method `func` to `service_name` + Services can have multiple entries. (i.e. different RGW configurations) + """ + self.log.debug(f"starting async {service_name} deployment") + specs = self.find_json_specs(service_name) + completions = list() + for spec in specs: + completions.append(func(spec)) + if completions: + return completions + return [trivial_result("Nothing to do..")] + + def find_json_specs(self, find_service_type: str) -> List[ServiceSpec]: + """ + Inspects the mon_store and gathers entries for the `find_service_type` + (i.e. 'mgr', 'rgw') service. + Some services have individual ServiceSpecs (rgw->RGWSpec, nfs->NFSServiceSpec) + so we need to make the distinction. + """ + specs = list() + self.log.debug(f"Checking for type {find_service_type}") + store = PersistentStoreDict(self, f'service_spec/{find_service_type}') + for service_type, json_specs in store.items(): + + if isinstance(json_specs, dict): + self.log.debug(f"Found dict in json_specs: No need to decode") + elif isinstance(json_specs, str): + self.log.debug(f"Found str in json_specs: Decoding from json") + json_specs = json.loads(json_specs) + + self.log.debug(f"Found service_type: {service_type} in the k-v store. Adding..") + specs.append(ServiceSpec.from_json(json_specs)) + self.log.debug(f"Found {specs} specs.") + return specs + + def _apply_services(self) -> List[orchestrator.Completion]: + """ + This is a method that is supposed to run continuously in the + server() thread. + It will initiate deployments based on the presence of a ServiceSpec + in the persistent mon_store. + There is a defined order in which the services should be deployed + Defined order: + # mon -> mgr -> osd -> monitoring -> mds -> rgw -> nfs -> iscsi -> rbd-mirror + + Special cases: + * Mons scaling is currently not implemented. + * OSDs are daemons that are handled differently and may not fit in this paradigm + + The serve() thread processes the completions serially, which ensures the adherence to + the defined order. + """ + + super_completions: List[orchestrator.Completion] = list() + super_completions.extend(self.trigger_deployment('mgr', self._apply_mgr)) + super_completions.extend(self.trigger_deployment('prometheus', self._apply_prometheus)) + super_completions.extend(self.trigger_deployment('node-exporter', self._apply_node_exporter)) + super_completions.extend(self.trigger_deployment('mds', self._apply_mds)) + super_completions.extend(self.trigger_deployment('rgw', self._apply_rgw)) + super_completions.extend(self.trigger_deployment('rbd-mirror', self._apply_rbd_mirror)) + + # Not implemented + + # super_completions.extend(trigger_deployment('mon', self._apply_mon)) + # super_completions.extend(trigger_deployment('nfs', self._apply_nfs)) + # super_completions.extend(trigger_deployment('grafana', self._apply_grafana)) + # super_completions.extend(trigger_deployment('iscsi', self._apply_iscsi)) + + # Not implemented + return super_completions + class BaseScheduler(object): """ diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py index e1350c516ee4..3976dc1030b7 100644 --- a/src/pybind/mgr/cephadm/tests/test_cephadm.py +++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py @@ -137,12 +137,12 @@ class TestCephadm(object): def test_mon_update(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module): with self._with_host(cephadm_module, 'test'): ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1) - c = cephadm_module.add_mon(ServiceSpec(placement=ps)) + c = cephadm_module.add_mon(ServiceSpec(placement=ps, service_type='mon')) assert wait(cephadm_module, c) == ["Deployed mon.a on host 'test'"] with pytest.raises(OrchestratorError, match="is missing a network spec"): ps = PlacementSpec(hosts=['test'], count=1) - c = cephadm_module.add_mon(ServiceSpec(placement=ps)) + c = cephadm_module.add_mon(ServiceSpec(placement=ps, service_type='mon')) wait(cephadm_module, c) @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]')) @@ -154,7 +154,7 @@ class TestCephadm(object): def test_mgr_update(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module): with self._with_host(cephadm_module, 'test'): ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1) - c = cephadm_module.apply_mgr(ServiceSpec(placement=ps)) + c = cephadm_module._apply_mgr(ServiceSpec(placement=ps, service_type='mgr')) [out] = wait(cephadm_module, c) match_glob(out, "Deployed mgr.* on host 'test'") @@ -202,7 +202,7 @@ class TestCephadm(object): def test_mds(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module): with self._with_host(cephadm_module, 'test'): ps = PlacementSpec(hosts=['test'], count=1) - c = cephadm_module.add_mds(ServiceSpec('name', placement=ps)) + c = cephadm_module.add_mds(ServiceSpec('name', placement=ps, service_type='mds')) [out] = wait(cephadm_module, c) match_glob(out, "Deployed mds.name.* on host 'test'") @@ -216,7 +216,7 @@ class TestCephadm(object): with self._with_host(cephadm_module, 'test'): ps = PlacementSpec(hosts=['test'], count=1) - c = cephadm_module.add_rgw(RGWSpec('realm', 'zone', placement=ps)) + c = cephadm_module.add_rgw(RGWSpec('realm', 'zone', placement=ps, service_type='rgw')) [out] = wait(cephadm_module, c) match_glob(out, "Deployed rgw.realm.zone.* on host 'test'") @@ -232,12 +232,12 @@ class TestCephadm(object): with self._with_host(cephadm_module, 'host1'): with self._with_host(cephadm_module, 'host2'): ps = PlacementSpec(hosts=['host1'], count=1) - c = cephadm_module.add_rgw(RGWSpec('realm', 'zone1', placement=ps)) + c = cephadm_module.add_rgw(RGWSpec('realm', 'zone1', placement=ps, service_type='rgw')) [out] = wait(cephadm_module, c) match_glob(out, "Deployed rgw.realm.zone1.host1.* on host 'host1'") ps = PlacementSpec(hosts=['host1', 'host2'], count=2) - c = cephadm_module.apply_rgw(RGWSpec('realm', 'zone1', placement=ps)) + c = cephadm_module._apply_rgw(RGWSpec('realm', 'zone1', placement=ps, service_type='rgw')) [out] = wait(cephadm_module, c) match_glob(out, "Deployed rgw.realm.zone1.host2.* on host 'host2'") @@ -252,12 +252,12 @@ class TestCephadm(object): with self._with_host(cephadm_module, 'host1'): with self._with_host(cephadm_module, 'host2'): ps = PlacementSpec(hosts=['host1'], count=1) - c = cephadm_module.add_rgw(RGWSpec('realm', 'zone1', placement=ps)) + c = cephadm_module.add_rgw(RGWSpec('realm', 'zone1', placement=ps, service_type='rgw')) [out] = wait(cephadm_module, c) match_glob(out, "Deployed rgw.realm.zone1.host1.* on host 'host1'") ps = PlacementSpec(hosts=['host2'], count=1) - c = cephadm_module.add_rgw(RGWSpec('realm', 'zone2', placement=ps)) + c = cephadm_module.add_rgw(RGWSpec('realm', 'zone2', placement=ps, service_type='rgw')) [out] = wait(cephadm_module, c) match_glob(out, "Deployed rgw.realm.zone2.host2.* on host 'host2'") @@ -267,7 +267,7 @@ class TestCephadm(object): with pytest.raises(OrchestratorError): ps = PlacementSpec(hosts=['host1', 'host2'], count=3) - c = cephadm_module.apply_rgw(RGWSpec('realm', 'zone1', placement=ps)) + c = cephadm_module._apply_rgw(RGWSpec('realm', 'zone1', placement=ps, service_type='rgw')) [out] = wait(cephadm_module, c) @@ -285,7 +285,8 @@ class TestCephadm(object): )) @mock.patch("cephadm.module.HostCache.save_host") @mock.patch("cephadm.module.HostCache.rm_host") - def test_remove_daemon(self, _save_host, _rm_host, cephadm_module): + @mock.patch("cephadm.module.CephadmOrchestrator._remove_key_from_store") + def test_remove_daemon(self, _save_host, _rm_host, _save_spec, cephadm_module): with self._with_host(cephadm_module, 'test'): c = cephadm_module.list_daemons(refresh=True) wait(cephadm_module, c) @@ -307,7 +308,8 @@ class TestCephadm(object): )) @mock.patch("cephadm.module.HostCache.save_host") @mock.patch("cephadm.module.HostCache.rm_host") - def test_remove_service(self, _save_host, _rm_host, cephadm_module): + @mock.patch("cephadm.module.CephadmOrchestrator._remove_key_from_store") + def test_remove_service(self, _save_host, _rm_host, _save_spec, cephadm_module): with self._with_host(cephadm_module, 'test'): c = cephadm_module.list_daemons(refresh=True) wait(cephadm_module, c) @@ -325,7 +327,7 @@ class TestCephadm(object): # type: (mock.Mock, mock.Mock, mock.Mock, mock.Mock, CephadmOrchestrator) -> None with self._with_host(cephadm_module, 'test'): ps = PlacementSpec(hosts=['test'], count=1) - c = cephadm_module.add_rbd_mirror(ServiceSpec(name='name', placement=ps)) + c = cephadm_module.add_rbd_mirror(ServiceSpec(name='name', placement=ps, service_type='rbd-mirror')) [out] = wait(cephadm_module, c) match_glob(out, "Deployed rbd-mirror.* on host 'test'") @@ -340,7 +342,7 @@ class TestCephadm(object): with self._with_host(cephadm_module, 'test'): ps = PlacementSpec(hosts=['test'], count=1) - c = cephadm_module.add_prometheus(ServiceSpec(placement=ps)) + c = cephadm_module.add_prometheus(ServiceSpec(placement=ps, service_type='prometheus')) [out] = wait(cephadm_module, c) match_glob(out, "Deployed prometheus.* on host 'test'") @@ -355,7 +357,7 @@ class TestCephadm(object): with self._with_host(cephadm_module, 'test'): ps = PlacementSpec(hosts=['test'], count=1) - c = cephadm_module.add_node_exporter(ServiceSpec(placement=ps)) + c = cephadm_module.add_node_exporter(ServiceSpec(placement=ps, service_type='node-exporter')) [out] = wait(cephadm_module, c) match_glob(out, "Deployed node-exporter.* on host 'test'") @@ -370,7 +372,7 @@ class TestCephadm(object): with self._with_host(cephadm_module, 'test'): ps = PlacementSpec(hosts=['test'], count=1) - c = cephadm_module.add_grafana(ServiceSpec(placement=ps)) + c = cephadm_module.add_grafana(ServiceSpec(placement=ps, service_type='grafana')) [out] = wait(cephadm_module, c) match_glob(out, "Deployed grafana.* on host 'test'") diff --git a/src/pybind/mgr/mgr_module.py b/src/pybind/mgr/mgr_module.py index 2b62d0672b40..2d33651cd87e 100644 --- a/src/pybind/mgr/mgr_module.py +++ b/src/pybind/mgr/mgr_module.py @@ -1562,7 +1562,7 @@ class PersistentStoreDict(object): def __init__(self, mgr, prefix): # type: (MgrModule, str) -> None self.mgr = mgr - self.prefix = prefix + '.' + self.prefix = prefix + '/' def _mk_store_key(self, key): return self.prefix + key @@ -1587,7 +1587,7 @@ class PersistentStoreDict(object): self.__missing__(key) return json.loads(val) except (KeyError, AttributeError, IndexError, ValueError, TypeError): - logging.getLogger(__name__).exception('failed to deserialize') + logging.getLogger(__name__).debug('failed to deserialize item in store') self.mgr.set_store(key, None) raise diff --git a/src/pybind/mgr/orchestrator/_interface.py b/src/pybind/mgr/orchestrator/_interface.py index 7e79329453d7..b5bc21cbe90f 100644 --- a/src/pybind/mgr/orchestrator/_interface.py +++ b/src/pybind/mgr/orchestrator/_interface.py @@ -8,6 +8,7 @@ import copy import functools import logging import pickle +import json import sys import time from collections import namedtuple @@ -913,6 +914,12 @@ class Orchestrator(object): """ raise NotImplementedError() + def apply_service_config(self, spec_document: str) -> Completion: + """ + Saves Service Specs from a yaml|json file + """ + raise NotImplementedError() + def remove_daemons(self, names, force): # type: (List[str], bool) -> Completion """ @@ -922,6 +929,20 @@ class Orchestrator(object): """ raise NotImplementedError() + def list_specs(self): + # type: () -> Completion + """ + Lists saved service specs + """ + raise NotImplementedError() + + def clear_all_specs(self): + # type: () -> Completion + """ + Lists saved service specs + """ + raise NotImplementedError() + def remove_service(self, service_name): # type: (str) -> Completion """ @@ -1222,7 +1243,6 @@ class PlacementSpec(object): else: self.hosts = [HostPlacementSpec.parse(x, require_network=False) for x in hosts if x] - self.count = count # type: Optional[int] self.all_hosts = all_hosts # type: bool @@ -1384,12 +1404,18 @@ class DaemonDescription(object): return self.name().startswith(service_name + '.') return False - def service_name(self): + def service_name(self, without_service_type=False): if self.daemon_type == 'rgw': v = self.daemon_id.split('.') - return 'rgw.%s' % ('.'.join(v[0:2])) + s_name = '.'.join(v[0:2]) + if without_service_type: + return s_name + return 'rgw.%s' % s_name if self.daemon_type in ['mds', 'nfs']: - return 'mds.%s' % (self.daemon_id.split('.')[0]) + _s_name = self.daemon_id.split('.')[0] + if without_service_type: + return _s_name + return 'mds.%s' % _s_name return self.daemon_type def __repr__(self): @@ -1442,6 +1468,7 @@ class ServiceDescription(object): service_url=None, last_refresh=None, size=0, + spec_presence='absent', running=0): # Not everyone runs in containers, but enough people do to # justify having the container_image_id (image hash) and container_image @@ -1470,6 +1497,11 @@ class ServiceDescription(object): # datetime when this info was last refreshed self.last_refresh = last_refresh # type: Optional[datetime.datetime] + # status string to indicate the presence of a persistent servicespec + # possible strings are "absent", "present" and "not applicable" while + # the "not applicable" is mostly for OSDs. + self.spec_presence = spec_presence + def service_type(self): if self.service_name: return self.service_name.split('.')[0] @@ -1514,8 +1546,8 @@ class ServiceSpec(object): """ - def __init__(self, name=None, placement=None): - # type: (Optional[str], Optional[PlacementSpec]) -> None + def __init__(self, name=None, placement=None, service_type=None, count=None): + # type: (Optional[str], Optional[PlacementSpec], Optional[str], Optional[int]) -> None self.placement = PlacementSpec() if placement is None else placement # type: PlacementSpec #: Give this set of stateless services a name: typically it would @@ -1523,12 +1555,46 @@ class ServiceSpec(object): #: within one ceph cluster. Note: Not all clusters have a name self.name = name # type: Optional[str] + assert service_type + self.service_type = service_type + if self.placement is not None and self.placement.count is not None: #: Count of service instances. Deprecated. self.count = self.placement.count # type: int else: self.count = 1 + @classmethod + def from_json(cls, json_spec: dict) -> "ServiceSpec": + """ + Initialize 'ServiceSpec' object data from a json structure + :param json_spec: A valid dict with ServiceSpec + """ + args: Dict[str, Dict[Any, Any]] = {} + service_type = json_spec.get('service_type', '') + assert service_type + if service_type == 'rgw': + _cls = RGWSpec # type: ignore + elif service_type == 'nfs': + _cls = NFSServiceSpec # type: ignore + else: + _cls = ServiceSpec # type: ignore + for k, v in json_spec.items(): + if k == 'placement': + v = PlacementSpec.from_dict(v) + if k == 'spec': + args.update(v) + continue + args.update({k: v}) + return _cls(**args) # type: ignore + + def to_json(self): + return json.dumps(self, default=lambda o: o.__dict__, + sort_keys=True, indent=4) + + def __repr__(self): + return "{}({!r})".format(self.__class__.__name__, self.__dict__) + def servicespec_validate_add(self: ServiceSpec): # This must not be a method of ServiceSpec, otherwise you'll hunt @@ -1552,10 +1618,9 @@ def servicespec_validate_hosts_have_network_spec(self: ServiceSpec): raise OrchestratorValidationError(m) - class NFSServiceSpec(ServiceSpec): - def __init__(self, name, pool=None, namespace=None, placement=None): - super(NFSServiceSpec, self).__init__(name, placement) + def __init__(self, name, pool=None, namespace=None, placement=None, service_type=None): + super(NFSServiceSpec, self).__init__(name=name, placement=placement, service_type=service_type) #: RADOS pool where NFS client recovery data is stored. self.pool = pool @@ -1579,6 +1644,8 @@ class RGWSpec(ServiceSpec): rgw_realm, # type: str rgw_zone, # type: str placement=None, + service_type=None, # type: Optional[str] + name=None, # type: Optional[str] hosts=None, # type: Optional[List[str]] rgw_multisite=None, # type: Optional[bool] rgw_zonemaster=None, # type: Optional[bool] @@ -1596,7 +1663,7 @@ class RGWSpec(ServiceSpec): # in Rook itself. Thus we don't set any defaults here in this class. super(RGWSpec, self).__init__(name=rgw_realm + '.' + rgw_zone, - placement=placement) + placement=placement, service_type=service_type) #: List of hosts where RGWs should run. Not for Rook. if hosts: @@ -1625,8 +1692,8 @@ class RGWSpec(ServiceSpec): @property def rgw_multisite_endpoints_list(self): return ",".join(["{}://{}:{}".format(self.rgw_multisite_proto, - host, - self.rgw_frontend_port) for host in self.placement.hosts]) + host, + self.rgw_frontend_port) for host in self.placement.hosts]) def genkey(self, nchars): """ Returns a random string of nchars @@ -1639,17 +1706,6 @@ class RGWSpec(ServiceSpec): string.ascii_lowercase + string.digits) for _ in range(nchars)) - @classmethod - def from_json(cls, json_rgw_spec): - # type: (dict) -> RGWSpec - """ - Initialize 'RGWSpec' object data from a json structure - :param json_rgw_spec: A valid dict with a the RGW settings - """ - # TODO: also add PlacementSpec(**json_rgw_spec['placement']) - args = {k:v for k, v in json_rgw_spec.items()} - return RGWSpec(**args) - class InventoryFilter(object): """ diff --git a/src/pybind/mgr/orchestrator/module.py b/src/pybind/mgr/orchestrator/module.py index dfdcc8b8947d..738966c177aa 100644 --- a/src/pybind/mgr/orchestrator/module.py +++ b/src/pybind/mgr/orchestrator/module.py @@ -335,7 +335,7 @@ class OrchestratorCli(OrchestratorClientMixin, MgrModule): else: now = datetime.datetime.utcnow() table = PrettyTable( - ['NAME', 'RUNNING', 'REFRESHED', 'IMAGE NAME', 'IMAGE ID'], + ['NAME', 'RUNNING', 'REFRESHED', 'IMAGE NAME', 'IMAGE ID', 'SPEC'], border=False) table.align['NAME'] = 'l' table.align['RUNNING'] = 'r' @@ -354,7 +354,9 @@ class OrchestratorCli(OrchestratorClientMixin, MgrModule): '%d/%d' % (s.running, s.size), age, ukn(s.container_image_name), - ukn(s.container_image_id)[0:12])) + ukn(s.container_image_id)[0:12], + ukn(s.spec_presence) + )) return HandleCommandResult(stdout=table.get_string()) @@ -507,7 +509,7 @@ Usage: placement = PlacementSpec(label=label, count=num, hosts=hosts) placement.validate() - spec = ServiceSpec(placement=placement) + spec = ServiceSpec(placement=placement, service_type='mon') completion = self.add_mon(spec) self._orchestrator_wait([completion]) @@ -527,6 +529,15 @@ Usage: raise_if_exception(completion) return HandleCommandResult(stdout=completion.result_str()) + @_cli_write_command( + 'orch apply', + desc='Applies a Service Specification from a file. ceph orch apply -i $file') + def _apply_services(self, inbuf): + completion = self.apply_service_config(inbuf) + self._orchestrator_wait([completion]) + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + @_cli_write_command( 'orch daemon add rbd-mirror', "name=num,type=CephInt,req=false " @@ -718,6 +729,26 @@ Usage: raise_if_exception(completion) return HandleCommandResult(stdout=completion.result_str()) + @_cli_write_command( + 'orch spec dump', + desc='List all Service specs') + def _get_service_specs(self): + completion = self.list_specs() + self._orchestrator_wait([completion]) + raise_if_exception(completion) + specs = completion.result_str() + return HandleCommandResult(stdout=specs) + + @_cli_write_command( + 'orch servicespecs clear', + desc='Clear all Service specs') + def _clear_service_specs(self): + completion = self.clear_all_specs() + self._orchestrator_wait([completion]) + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_write_command( 'orch apply mgr', "name=num,type=CephInt,req=false " @@ -729,7 +760,7 @@ Usage: label=label, count=num, hosts=hosts) placement.validate() - spec = ServiceSpec(placement=placement) + spec = ServiceSpec(placement=placement, service_type='mgr') completion = self.apply_mgr(spec) self._orchestrator_wait([completion]) @@ -766,11 +797,10 @@ Usage: def _apply_mds(self, fs_name, num=None, label=None, hosts=[]): placement = PlacementSpec(label=label, count=num, hosts=hosts) placement.validate() - spec = ServiceSpec( - fs_name, + service_type='mds', + name=fs_name, placement=placement) - completion = self.apply_mds(spec) self._orchestrator_wait([completion]) raise_if_exception(completion) @@ -784,7 +814,8 @@ Usage: 'Update the number of rbd-mirror instances') def _apply_rbd_mirror(self, num, label=None, hosts=[]): spec = ServiceSpec( - placement=PlacementSpec(hosts=hosts, count=num, label=label)) + placement=PlacementSpec(hosts=hosts, count=num, label=label), + service_type='rbd-mirror') completion = self.apply_rbd_mirror(spec) self._orchestrator_wait([completion]) raise_if_exception(completion) @@ -800,6 +831,7 @@ Usage: 'Update the number of RGW instances for the given zone') def _apply_rgw(self, zone_name, realm_name, num=None, label=None, hosts=[]): spec = RGWSpec( + service_type='rgw', rgw_realm=realm_name, rgw_zone=zone_name, placement=PlacementSpec(hosts=hosts, label=label, count=num)) @@ -835,6 +867,7 @@ Usage: # type: (Optional[int], Optional[str], List[str]) -> HandleCommandResult spec = ServiceSpec( placement=PlacementSpec(label=label, hosts=hosts, count=num), + service_type='prometheus' ) completion = self.apply_prometheus(spec) self._orchestrator_wait([completion]) @@ -850,6 +883,7 @@ Usage: # type: (Optional[int], Optional[str], List[str]) -> HandleCommandResult spec = ServiceSpec( placement=PlacementSpec(label=label, hosts=hosts, count=num), + service_type='node-exporter' ) completion = self.apply_node_exporter(spec) self._orchestrator_wait([completion]) diff --git a/src/pybind/mgr/tests/test_orchestrator.py b/src/pybind/mgr/tests/test_orchestrator.py index 3b5163604157..3fa5fc8d53fe 100644 --- a/src/pybind/mgr/tests/test_orchestrator.py +++ b/src/pybind/mgr/tests/test_orchestrator.py @@ -117,6 +117,7 @@ def test_rgwspec(): """ { "rgw_zone": "zonename", + "service_type": "rgw", "rgw_frontend_port": 8080, "rgw_zonegroup": "group", "rgw_zone_user": "user",