From: Sage Weil Date: Fri, 28 Feb 2020 22:38:26 +0000 (-0600) Subject: mgr/cephadm: replace PersistentStoreDict with SpecStore X-Git-Tag: v15.1.1~191^2~11 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=1980250ab81ee2105143511d9a3dfc735c3778b2;p=ceph-ci.git mgr/cephadm: replace PersistentStoreDict with SpecStore Explicit implementation of the dict of specs. Signed-off-by: Sage Weil --- diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 9c4c4709c16..0c4fda57dc7 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -26,7 +26,6 @@ import re import shutil import subprocess import uuid -from mgr_module import PersistentStoreDict from ceph.deployment import inventory, translate from ceph.deployment.drive_group import DriveGroupSpec @@ -64,6 +63,7 @@ DEFAULT_SSH_CONFIG = ('Host *\n' DATEFMT = '%Y-%m-%dT%H:%M:%S.%f' HOST_CACHE_PREFIX = "host." +SPEC_STORE_PREFIX = "spec." # for py2 compat try: @@ -126,6 +126,46 @@ def assert_valid_host(name): raise OrchestratorError(e) +class SpecStore(): + def __init__(self, mgr): + # type: (CephadmOrchestrator) -> None + self.mgr = mgr + self.specs = {} # type: Dict[str, orchestrator.ServiceSpec] + + def load(self): + # type: () -> None + for k, v in six.iteritems(self.mgr.get_store_prefix(SPEC_STORE_PREFIX)): + service_name = k[len(SPEC_STORE_PREFIX):] + try: + spec = ServiceSpec.from_json(json.loads(v)) + self.specs[service_name] = spec + self.mgr.log.debug('SpecStore: loaded spec for %s' % ( + service_name)) + except Exception as e: + self.mgr.log.warning('unable to load spec for %s: %s' % ( + service_name, e)) + pass + + def save(self, spec): + # type: (orchestrator.ServiceSpec) -> None + self.specs[spec.service_name()] = spec + self.mgr.set_store(SPEC_STORE_PREFIX + spec.service_name(), + spec.to_json()) + + def rm(self, service_name): + # type: (str) -> None + if service_name in self.specs: + del self.specs[service_name] + self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None) + + def find(self, service_name): + # type: (str) -> List[orchestrator.ServiceSpec] + specs = [] + for sn, spec in self.specs.items(): + if sn == service_name or sn.startswith(service_name + '.'): + specs.append(spec) + return specs + class HostCache(): def __init__(self, mgr): # type: (CephadmOrchestrator) -> None @@ -586,7 +626,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): self.osd_removal_report: dict = dict() self.rm_util = RemoveUtil(self) - self.service_spec_store = PersistentStoreDict(self, 'service_spec') + self.spec_store = SpecStore(self) + self.spec_store.load() # ensure the host lists are in sync for h in self.inventory.keys(): @@ -1605,10 +1646,9 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): n: str = dd.service_name() if service_name and service_name != n: continue - try: - _ = self.service_spec_store[dd.service_name()] + if dd.service_name() in self.spec_store.specs: spec_presence = "present" - except IndexError: + else: spec_presence = "absent" if dd.daemon_type == 'osd': spec_presence = "not applicable" @@ -1723,7 +1763,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): args.append( (d.name(), d.hostname, force) ) - self._remove_key_from_store(d.service_name()) + self.spec_store.rm(d.service_name()) if not args: raise OrchestratorError('Unable to find daemons in %s service' % ( service_name)) @@ -1731,10 +1771,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): service_name, [a[0] for a in args])) return self._remove_daemon(args) - def _remove_key_from_store(self, spec_name: str) -> None: - self.log.debug(f"Removing {spec_name} from the service_spec store") - del self.service_spec_store[spec_name] - def get_inventory(self, host_filter=None, refresh=False): """ Return the storage inventory of hosts matching the given filter. @@ -2153,7 +2189,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): return self._add_daemon('mgr', spec, self._create_mgr) def apply_mgr(self, spec): - self.save_spec(spec) + self.spec_store.save(spec) self._kick_serve_loop() return trivial_result("Scheduled MGR creation..") @@ -2166,7 +2202,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): return self._add_daemon('mds', spec, self._create_mds, self._config_mds) def apply_mds(self, spec: orchestrator.ServiceSpec) -> orchestrator.Completion: - self.save_spec(spec) + self.spec_store.save(spec) self._kick_serve_loop() return trivial_result("Scheduled MDS creation..") @@ -2227,7 +2263,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): return self._create_daemon('rgw', rgw_id, host, keyring=keyring) def apply_rgw(self, spec): - self.save_spec(spec) + self.spec_store.save(spec) self._kick_serve_loop() return trivial_result("Scheduled RGW creation..") @@ -2251,7 +2287,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): keyring=keyring) def apply_rbd_mirror(self, spec): - self.save_spec(spec) + self.spec_store.save(spec) self._kick_serve_loop() return trivial_result("Scheduled rbd-mirror creation..") @@ -2460,7 +2496,7 @@ receivers: return self._apply_service('prometheus', spec, self._create_prometheus) def apply_prometheus(self, spec): - self.save_spec(spec) + self.spec_store.save(spec) self._kick_serve_loop() return trivial_result("Scheduled prometheus creation..") @@ -2470,7 +2506,7 @@ receivers: self._create_node_exporter) def apply_node_exporter(self, spec): - self.save_spec(spec) + self.spec_store.save(spec) self._kick_serve_loop() return trivial_result("Scheduled node-exporter creation..") @@ -2736,34 +2772,12 @@ receivers: """ return trivial_result(self.osd_removal_report) - def save_spec(self, spec: ServiceSpec) -> None: - """ - Attempts to save a ServiceSpec. - - There are two ways to manipulate the service_specs stored in the mon_store - 1) Using a global definition (i.e. `ceph orch apply -i `) - This will usually contain multiple definitions of services and daemons. - - 2) Using the CLI for specific service types (i.e. `ceph orch apply rgw 3 --realm foo --zone bar`) - - Raises `OrchestratorError` if an entry with the same named identifier already exist to prevent overwrites from two different sources. - """ - full_spec_name = f"{spec.service_type}.{spec.name}" - try: - _ = self.service_spec_store[full_spec_name] - raise orchestrator.OrchestratorError(f"Specification for {full_spec_name} already exists. " - "Please review your existing specs with " - "`ceph orch spec dump` and try again.") - except IndexError: - self.log.info(f"New spec found. Saving <{full_spec_name}> to the store.") - _ = self.service_spec_store[full_spec_name] = spec.to_json() - def list_specs(self) -> orchestrator.Completion: """ Loads all entries from the service_spec mon_store root. """ specs = list() - for _, spec in self.service_spec_store.items(): + for service_name, spec in self.spec_store.specs.items(): specs.append('---') specs.append(yaml.dump(spec)) return trivial_result(specs) @@ -2773,7 +2787,7 @@ receivers: Parse a multi document yaml file (represented in a inbuf object) and loads it with it's respective ServiceSpec to validate the initial input. - If no errors are raised `save_spec` is called. + If no errors are raised, save them. """ content: Iterator[Any] = yaml.load_all(spec_document) # Load all specs from a multi document yaml file. @@ -2783,18 +2797,19 @@ receivers: spec_o = ServiceSpec.from_json(spec) loaded_specs.append(spec_o) for spec in loaded_specs: - self.save_spec(spec) + self.spec_store.save(spec) self._kick_serve_loop() return trivial_result("ServiceSpecs saved") - def trigger_deployment(self, service_name: str, + def trigger_deployment(self, + service_name: str, func: Callable[[ServiceSpec], orchestrator.Completion]) -> List[orchestrator.Completion]: """ Triggers a corresponding deployment method `func` to `service_name` Services can have multiple entries. (i.e. different RGW configurations) """ self.log.debug(f"starting async {service_name} deployment") - specs = self.find_json_specs(service_name) + specs = self.spec_store.find(service_name) completions = list() for spec in specs: completions.append(func(spec)) @@ -2802,29 +2817,6 @@ receivers: return completions return [trivial_result("Nothing to do..")] - def find_json_specs(self, find_service_type: str) -> List[ServiceSpec]: - """ - Inspects the mon_store and gathers entries for the `find_service_type` - (i.e. 'mgr', 'rgw') service. - Some services have individual ServiceSpecs (rgw->RGWSpec, nfs->NFSServiceSpec) - so we need to make the distinction. - """ - specs = list() - self.log.debug(f"Checking for type {find_service_type}") - for spec_key, json_specs in self.service_spec_store.items(): - if not spec_key.split('.')[0].startswith(find_service_type): - continue - if isinstance(json_specs, dict): - self.log.debug(f"Found dict in json_specs: No need to decode") - elif isinstance(json_specs, str): - self.log.debug(f"Found str in json_specs: Decoding from json") - json_specs = json.loads(json_specs) - - self.log.debug(f"Found service_type: {spec_key} in the k-v store. Adding..") - specs.append(ServiceSpec.from_json(json_specs)) - self.log.debug(f"Found {specs} specs.") - return specs - def _apply_services(self) -> List[orchestrator.Completion]: """ This is a method that is supposed to run continuously in the diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py index bdcdb32cbb6..c629613ba5c 100644 --- a/src/pybind/mgr/cephadm/tests/test_cephadm.py +++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py @@ -285,8 +285,7 @@ class TestCephadm(object): )) @mock.patch("cephadm.module.HostCache.save_host") @mock.patch("cephadm.module.HostCache.rm_host") - @mock.patch("cephadm.module.CephadmOrchestrator._remove_key_from_store") - def test_remove_daemon(self, rm_key, _rm_host, _save_spec, cephadm_module): + def test_remove_daemon(self, _rm_host, _save_spec, cephadm_module): with self._with_host(cephadm_module, 'test'): c = cephadm_module.list_daemons(refresh=True) wait(cephadm_module, c) @@ -308,8 +307,8 @@ class TestCephadm(object): )) @mock.patch("cephadm.module.HostCache.save_host") @mock.patch("cephadm.module.HostCache.rm_host") - @mock.patch("cephadm.module.CephadmOrchestrator._remove_key_from_store") - def test_remove_service(self, _rm_key, _rm_host, _save_spec, cephadm_module): + @mock.patch("cephadm.module.SpecStore.rm") + def test_remove_service(self, _rm_spec, _rm_host, _save_spec, cephadm_module): with self._with_host(cephadm_module, 'test'): c = cephadm_module.list_daemons(refresh=True) wait(cephadm_module, c) @@ -406,7 +405,7 @@ class TestCephadm(object): @mock.patch("cephadm.module.CephadmOrchestrator.send_command") @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command) @mock.patch("cephadm.module.CephadmOrchestrator._get_connection") - @mock.patch("cephadm.module.CephadmOrchestrator.save_spec") + @mock.patch("cephadm.module.SpecStore.save") @mock.patch("cephadm.module.HostCache.save_host") @mock.patch("cephadm.module.HostCache.rm_host") def test_apply_mgr_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module): @@ -421,7 +420,7 @@ class TestCephadm(object): @mock.patch("cephadm.module.CephadmOrchestrator.send_command") @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command) @mock.patch("cephadm.module.CephadmOrchestrator._get_connection") - @mock.patch("cephadm.module.CephadmOrchestrator.save_spec") + @mock.patch("cephadm.module.SpecStore.save") @mock.patch("cephadm.module.HostCache.save_host") @mock.patch("cephadm.module.HostCache.rm_host") def test_apply_mds_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module): @@ -436,7 +435,7 @@ class TestCephadm(object): @mock.patch("cephadm.module.CephadmOrchestrator.send_command") @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command) @mock.patch("cephadm.module.CephadmOrchestrator._get_connection") - @mock.patch("cephadm.module.CephadmOrchestrator.save_spec") + @mock.patch("cephadm.module.SpecStore.save") @mock.patch("cephadm.module.HostCache.save_host") @mock.patch("cephadm.module.HostCache.rm_host") def test_apply_rgw_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module): @@ -451,7 +450,7 @@ class TestCephadm(object): @mock.patch("cephadm.module.CephadmOrchestrator.send_command") @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command) @mock.patch("cephadm.module.CephadmOrchestrator._get_connection") - @mock.patch("cephadm.module.CephadmOrchestrator.save_spec") + @mock.patch("cephadm.module.SpecStore.save") @mock.patch("cephadm.module.HostCache.save_host") @mock.patch("cephadm.module.HostCache.rm_host") def test_apply_rbd_mirror_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module): @@ -466,7 +465,7 @@ class TestCephadm(object): @mock.patch("cephadm.module.CephadmOrchestrator.send_command") @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command) @mock.patch("cephadm.module.CephadmOrchestrator._get_connection") - @mock.patch("cephadm.module.CephadmOrchestrator.save_spec") + @mock.patch("cephadm.module.SpecStore.save") @mock.patch("cephadm.module.HostCache.save_host") @mock.patch("cephadm.module.HostCache.rm_host") def test_apply_prometheus_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module): @@ -481,7 +480,7 @@ class TestCephadm(object): @mock.patch("cephadm.module.CephadmOrchestrator.send_command") @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command) @mock.patch("cephadm.module.CephadmOrchestrator._get_connection") - @mock.patch("cephadm.module.CephadmOrchestrator.save_spec") + @mock.patch("cephadm.module.SpecStore.save") @mock.patch("cephadm.module.HostCache.save_host") @mock.patch("cephadm.module.HostCache.rm_host") def test_apply_node_exporter_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module): @@ -496,7 +495,7 @@ class TestCephadm(object): @mock.patch("cephadm.module.CephadmOrchestrator.send_command") @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command) @mock.patch("cephadm.module.CephadmOrchestrator._get_connection") - @mock.patch("cephadm.module.CephadmOrchestrator.save_spec") + @mock.patch("cephadm.module.SpecStore.save") @mock.patch("cephadm.module.HostCache.save_host") @mock.patch("cephadm.module.HostCache.rm_host") @mock.patch("cephadm.module.yaml.load_all", return_value=[{'service_type': 'rgw', 'placement': {'count': 1}, 'spec': {'rgw_realm': 'realm1', 'rgw_zone': 'zone1'}}]) @@ -515,12 +514,12 @@ class TestCephadm(object): @mock.patch("cephadm.module.CephadmOrchestrator._get_connection") @mock.patch("cephadm.module.HostCache.save_host") @mock.patch("cephadm.module.HostCache.rm_host") - @mock.patch("cephadm.module.CephadmOrchestrator.find_json_specs") - def test_trigger_deployment_todo(self, _find_json_spec, _send_command, _get_connection, _save_host, _rm_host, cephadm_module): + @mock.patch("cephadm.module.SpecStore.find") + def test_trigger_deployment_todo(self, _find, _send_command, _get_connection, _save_host, _rm_host, cephadm_module): with self._with_host(cephadm_module, 'test'): - _find_json_spec.return_value = ['something'] + _find.return_value = ['something'] c = cephadm_module.trigger_deployment('foo', lambda x: x) - _find_json_spec.assert_called_with('foo') + _find.assert_called_with('foo') assert c == ['something'] @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) @@ -529,12 +528,12 @@ class TestCephadm(object): @mock.patch("cephadm.module.CephadmOrchestrator._get_connection") @mock.patch("cephadm.module.HostCache.save_host") @mock.patch("cephadm.module.HostCache.rm_host") - @mock.patch("cephadm.module.CephadmOrchestrator.find_json_specs") - def test_trigger_deployment_no_todo(self, _find_json_spec, _send_command, _get_connection, _save_host, _rm_host, cephadm_module): + @mock.patch("cephadm.module.SpecStore.find") + def test_trigger_deployment_no_todo(self, _find, _send_command, _get_connection, _save_host, _rm_host, cephadm_module): with self._with_host(cephadm_module, 'test'): - _find_json_spec.return_value = [] + _find.return_value = [] c = cephadm_module.trigger_deployment('foo', lambda x: x) - _find_json_spec.assert_called_with('foo') + _find.assert_called_with('foo') assert wait(cephadm_module, c[0]) == 'Nothing to do..' @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}')) diff --git a/src/pybind/mgr/orchestrator/_interface.py b/src/pybind/mgr/orchestrator/_interface.py index 1fe44aee630..f24a7b36d95 100644 --- a/src/pybind/mgr/orchestrator/_interface.py +++ b/src/pybind/mgr/orchestrator/_interface.py @@ -1577,6 +1577,12 @@ class ServiceSpec(object): args.update({k: v}) return _cls(**args) # type: ignore + def service_name(self): + n = self.service_type + if self.name: + n += '.' + self.name + return n + def to_json(self): return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=4)