]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/cephadm: replace PersistentStoreDict with SpecStore
authorSage Weil <sage@redhat.com>
Fri, 28 Feb 2020 22:38:26 +0000 (16:38 -0600)
committerSage Weil <sage@redhat.com>
Sun, 1 Mar 2020 14:10:54 +0000 (08:10 -0600)
Explicit implementation of the dict of specs.

Signed-off-by: Sage Weil <sage@redhat.com>
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/tests/test_cephadm.py
src/pybind/mgr/orchestrator/_interface.py

index 9c4c4709c163e7ba29155b9d6343b33a8290d1c7..0c4fda57dc754fec01f5bc72addb6c8ef1ac0cd9 100644 (file)
@@ -26,7 +26,6 @@ import re
 import shutil
 import subprocess
 import uuid
-from mgr_module import PersistentStoreDict
 
 from ceph.deployment import inventory, translate
 from ceph.deployment.drive_group import DriveGroupSpec
@@ -64,6 +63,7 @@ DEFAULT_SSH_CONFIG = ('Host *\n'
 DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
 
 HOST_CACHE_PREFIX = "host."
+SPEC_STORE_PREFIX = "spec."
 
 # for py2 compat
 try:
@@ -126,6 +126,46 @@ def assert_valid_host(name):
         raise OrchestratorError(e)
 
 
+class SpecStore():
+    def __init__(self, mgr):
+        # type: (CephadmOrchestrator) -> None
+        self.mgr = mgr
+        self.specs = {} # type: Dict[str, orchestrator.ServiceSpec]
+
+    def load(self):
+        # type: () -> None
+        for k, v in six.iteritems(self.mgr.get_store_prefix(SPEC_STORE_PREFIX)):
+            service_name = k[len(SPEC_STORE_PREFIX):]
+            try:
+                spec = ServiceSpec.from_json(json.loads(v))
+                self.specs[service_name] = spec
+                self.mgr.log.debug('SpecStore: loaded spec for %s' % (
+                    service_name))
+            except Exception as e:
+                self.mgr.log.warning('unable to load spec for %s: %s' % (
+                    service_name, e))
+                pass
+
+    def save(self, spec):
+        # type: (orchestrator.ServiceSpec) -> None
+        self.specs[spec.service_name()] = spec
+        self.mgr.set_store(SPEC_STORE_PREFIX + spec.service_name(),
+                           spec.to_json())
+
+    def rm(self, service_name):
+        # type: (str) -> None
+        if service_name in self.specs:
+            del self.specs[service_name]
+            self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
+
+    def find(self, service_name):
+        # type: (str) -> List[orchestrator.ServiceSpec]
+        specs = []
+        for sn, spec in self.specs.items():
+            if sn == service_name or sn.startswith(service_name + '.'):
+                specs.append(spec)
+        return specs
+
 class HostCache():
     def __init__(self, mgr):
         # type: (CephadmOrchestrator) -> None
@@ -586,7 +626,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         self.osd_removal_report: dict = dict()
         self.rm_util = RemoveUtil(self)
 
-        self.service_spec_store = PersistentStoreDict(self, 'service_spec')
+        self.spec_store = SpecStore(self)
+        self.spec_store.load()
 
         # ensure the host lists are in sync
         for h in self.inventory.keys():
@@ -1605,10 +1646,9 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                 n: str = dd.service_name()
                 if service_name and service_name != n:
                     continue
-                try:
-                    _ = self.service_spec_store[dd.service_name()]
+                if dd.service_name() in self.spec_store.specs:
                     spec_presence = "present"
-                except IndexError:
+                else:
                     spec_presence = "absent"
                 if dd.daemon_type == 'osd':
                     spec_presence = "not applicable"
@@ -1723,7 +1763,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                     args.append(
                         (d.name(), d.hostname, force)
                     )
-                    self._remove_key_from_store(d.service_name())
+                    self.spec_store.rm(d.service_name())
         if not args:
             raise OrchestratorError('Unable to find daemons in %s service' % (
                 service_name))
@@ -1731,10 +1771,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             service_name, [a[0] for a in args]))
         return self._remove_daemon(args)
 
-    def _remove_key_from_store(self, spec_name: str) -> None:
-        self.log.debug(f"Removing {spec_name} from the service_spec store")
-        del self.service_spec_store[spec_name]
-
     def get_inventory(self, host_filter=None, refresh=False):
         """
         Return the storage inventory of hosts matching the given filter.
@@ -2153,7 +2189,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         return self._add_daemon('mgr', spec, self._create_mgr)
 
     def apply_mgr(self, spec):
-        self.save_spec(spec)
+        self.spec_store.save(spec)
         self._kick_serve_loop()
         return trivial_result("Scheduled MGR creation..")
 
@@ -2166,7 +2202,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         return self._add_daemon('mds', spec, self._create_mds, self._config_mds)
 
     def apply_mds(self, spec: orchestrator.ServiceSpec) -> orchestrator.Completion:
-        self.save_spec(spec)
+        self.spec_store.save(spec)
         self._kick_serve_loop()
         return trivial_result("Scheduled MDS creation..")
 
@@ -2227,7 +2263,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         return self._create_daemon('rgw', rgw_id, host, keyring=keyring)
 
     def apply_rgw(self, spec):
-        self.save_spec(spec)
+        self.spec_store.save(spec)
         self._kick_serve_loop()
         return trivial_result("Scheduled RGW creation..")
 
@@ -2251,7 +2287,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                                    keyring=keyring)
 
     def apply_rbd_mirror(self, spec):
-        self.save_spec(spec)
+        self.spec_store.save(spec)
         self._kick_serve_loop()
         return trivial_result("Scheduled rbd-mirror creation..")
 
@@ -2460,7 +2496,7 @@ receivers:
         return self._apply_service('prometheus', spec, self._create_prometheus)
 
     def apply_prometheus(self, spec):
-        self.save_spec(spec)
+        self.spec_store.save(spec)
         self._kick_serve_loop()
         return trivial_result("Scheduled prometheus creation..")
 
@@ -2470,7 +2506,7 @@ receivers:
                                 self._create_node_exporter)
 
     def apply_node_exporter(self, spec):
-        self.save_spec(spec)
+        self.spec_store.save(spec)
         self._kick_serve_loop()
         return trivial_result("Scheduled node-exporter creation..")
 
@@ -2736,34 +2772,12 @@ receivers:
         """
         return trivial_result(self.osd_removal_report)
 
-    def save_spec(self, spec: ServiceSpec) -> None:
-        """
-        Attempts to save a ServiceSpec.
-
-        There are two ways to manipulate the service_specs stored in the mon_store
-        1) Using a global definition (i.e. `ceph orch apply -i <specs_file.yaml>`)
-            This will usually contain multiple definitions of services and daemons.
-
-        2) Using the CLI for specific service types (i.e. `ceph orch apply rgw 3 --realm foo --zone bar`)
-
-         Raises `OrchestratorError` if an entry with the same named identifier already exist to prevent overwrites from two different sources.
-        """
-        full_spec_name = f"{spec.service_type}.{spec.name}"
-        try:
-            _ = self.service_spec_store[full_spec_name]
-            raise orchestrator.OrchestratorError(f"Specification for {full_spec_name} already exists. "
-                                                 "Please review your existing specs with "
-                                                 "`ceph orch spec dump` and try again.")
-        except IndexError:
-            self.log.info(f"New spec found. Saving <{full_spec_name}> to the store.")
-            _ = self.service_spec_store[full_spec_name] = spec.to_json()
-
     def list_specs(self) -> orchestrator.Completion:
         """
         Loads all entries from the service_spec mon_store root.
         """
         specs = list()
-        for _, spec in self.service_spec_store.items():
+        for service_name, spec in self.spec_store.specs.items():
             specs.append('---')
             specs.append(yaml.dump(spec))
         return trivial_result(specs)
@@ -2773,7 +2787,7 @@ receivers:
         Parse a multi document yaml file (represented in a inbuf object)
         and loads it with it's respective ServiceSpec to validate the
         initial input.
-        If no errors are raised `save_spec` is called.
+        If no errors are raised, save them.
         """
         content: Iterator[Any] = yaml.load_all(spec_document)
         # Load all specs from a multi document yaml file.
@@ -2783,18 +2797,19 @@ receivers:
             spec_o = ServiceSpec.from_json(spec)
             loaded_specs.append(spec_o)
         for spec in loaded_specs:
-            self.save_spec(spec)
+            self.spec_store.save(spec)
         self._kick_serve_loop()
         return trivial_result("ServiceSpecs saved")
 
-    def trigger_deployment(self, service_name: str,
+    def trigger_deployment(self,
+                           service_name: str,
                            func: Callable[[ServiceSpec], orchestrator.Completion]) -> List[orchestrator.Completion]:
         """
         Triggers a corresponding deployment method `func` to `service_name`
         Services can have multiple entries. (i.e. different RGW configurations)
         """
         self.log.debug(f"starting async {service_name} deployment")
-        specs = self.find_json_specs(service_name)
+        specs = self.spec_store.find(service_name)
         completions = list()
         for spec in specs:
             completions.append(func(spec))
@@ -2802,29 +2817,6 @@ receivers:
             return completions
         return [trivial_result("Nothing to do..")]
 
-    def find_json_specs(self, find_service_type: str) -> List[ServiceSpec]:
-        """
-        Inspects the mon_store and gathers entries for the `find_service_type`
-        (i.e. 'mgr', 'rgw') service.
-        Some services have individual ServiceSpecs (rgw->RGWSpec, nfs->NFSServiceSpec)
-        so we need to make the distinction.
-        """
-        specs = list()
-        self.log.debug(f"Checking for type {find_service_type}")
-        for spec_key, json_specs in self.service_spec_store.items():
-            if not spec_key.split('.')[0].startswith(find_service_type):
-                continue
-            if isinstance(json_specs, dict):
-                self.log.debug(f"Found dict in json_specs: No need to decode")
-            elif isinstance(json_specs, str):
-                self.log.debug(f"Found str in json_specs: Decoding from json")
-                json_specs = json.loads(json_specs)
-
-            self.log.debug(f"Found service_type: {spec_key} in the k-v store. Adding..")
-            specs.append(ServiceSpec.from_json(json_specs))
-        self.log.debug(f"Found {specs} specs.")
-        return specs
-
     def _apply_services(self) -> List[orchestrator.Completion]:
         """
         This is a method that is supposed to run continuously in the
index bdcdb32cbb6b2465c2cc6eb8cccb32001b114bf9..c629613ba5c570a7dc755856aa43f85844d21a7f 100644 (file)
@@ -285,8 +285,7 @@ class TestCephadm(object):
     ))
     @mock.patch("cephadm.module.HostCache.save_host")
     @mock.patch("cephadm.module.HostCache.rm_host")
-    @mock.patch("cephadm.module.CephadmOrchestrator._remove_key_from_store")
-    def test_remove_daemon(self, rm_key, _rm_host, _save_spec, cephadm_module):
+    def test_remove_daemon(self, _rm_host, _save_spec, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             c = cephadm_module.list_daemons(refresh=True)
             wait(cephadm_module, c)
@@ -308,8 +307,8 @@ class TestCephadm(object):
     ))
     @mock.patch("cephadm.module.HostCache.save_host")
     @mock.patch("cephadm.module.HostCache.rm_host")
-    @mock.patch("cephadm.module.CephadmOrchestrator._remove_key_from_store")
-    def test_remove_service(self, _rm_key, _rm_host, _save_spec, cephadm_module):
+    @mock.patch("cephadm.module.SpecStore.rm")
+    def test_remove_service(self, _rm_spec, _rm_host, _save_spec, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             c = cephadm_module.list_daemons(refresh=True)
             wait(cephadm_module, c)
@@ -406,7 +405,7 @@ class TestCephadm(object):
     @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
     @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
     @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
-    @mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
+    @mock.patch("cephadm.module.SpecStore.save")
     @mock.patch("cephadm.module.HostCache.save_host")
     @mock.patch("cephadm.module.HostCache.rm_host")
     def test_apply_mgr_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module):
@@ -421,7 +420,7 @@ class TestCephadm(object):
     @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
     @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
     @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
-    @mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
+    @mock.patch("cephadm.module.SpecStore.save")
     @mock.patch("cephadm.module.HostCache.save_host")
     @mock.patch("cephadm.module.HostCache.rm_host")
     def test_apply_mds_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module):
@@ -436,7 +435,7 @@ class TestCephadm(object):
     @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
     @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
     @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
-    @mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
+    @mock.patch("cephadm.module.SpecStore.save")
     @mock.patch("cephadm.module.HostCache.save_host")
     @mock.patch("cephadm.module.HostCache.rm_host")
     def test_apply_rgw_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module):
@@ -451,7 +450,7 @@ class TestCephadm(object):
     @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
     @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
     @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
-    @mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
+    @mock.patch("cephadm.module.SpecStore.save")
     @mock.patch("cephadm.module.HostCache.save_host")
     @mock.patch("cephadm.module.HostCache.rm_host")
     def test_apply_rbd_mirror_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module):
@@ -466,7 +465,7 @@ class TestCephadm(object):
     @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
     @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
     @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
-    @mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
+    @mock.patch("cephadm.module.SpecStore.save")
     @mock.patch("cephadm.module.HostCache.save_host")
     @mock.patch("cephadm.module.HostCache.rm_host")
     def test_apply_prometheus_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module):
@@ -481,7 +480,7 @@ class TestCephadm(object):
     @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
     @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
     @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
-    @mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
+    @mock.patch("cephadm.module.SpecStore.save")
     @mock.patch("cephadm.module.HostCache.save_host")
     @mock.patch("cephadm.module.HostCache.rm_host")
     def test_apply_node_exporter_save(self, _send_command, _get_connection, _save_spec, _save_host, _rm_host, cephadm_module):
@@ -496,7 +495,7 @@ class TestCephadm(object):
     @mock.patch("cephadm.module.CephadmOrchestrator.send_command")
     @mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
     @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
-    @mock.patch("cephadm.module.CephadmOrchestrator.save_spec")
+    @mock.patch("cephadm.module.SpecStore.save")
     @mock.patch("cephadm.module.HostCache.save_host")
     @mock.patch("cephadm.module.HostCache.rm_host")
     @mock.patch("cephadm.module.yaml.load_all", return_value=[{'service_type': 'rgw', 'placement': {'count': 1}, 'spec': {'rgw_realm': 'realm1', 'rgw_zone': 'zone1'}}])
@@ -515,12 +514,12 @@ class TestCephadm(object):
     @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
     @mock.patch("cephadm.module.HostCache.save_host")
     @mock.patch("cephadm.module.HostCache.rm_host")
-    @mock.patch("cephadm.module.CephadmOrchestrator.find_json_specs")
-    def test_trigger_deployment_todo(self, _find_json_spec, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
+    @mock.patch("cephadm.module.SpecStore.find")
+    def test_trigger_deployment_todo(self, _find, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
-            _find_json_spec.return_value = ['something']
+            _find.return_value = ['something']
             c = cephadm_module.trigger_deployment('foo', lambda x: x)
-            _find_json_spec.assert_called_with('foo')
+            _find.assert_called_with('foo')
             assert c == ['something']
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@@ -529,12 +528,12 @@ class TestCephadm(object):
     @mock.patch("cephadm.module.CephadmOrchestrator._get_connection")
     @mock.patch("cephadm.module.HostCache.save_host")
     @mock.patch("cephadm.module.HostCache.rm_host")
-    @mock.patch("cephadm.module.CephadmOrchestrator.find_json_specs")
-    def test_trigger_deployment_no_todo(self, _find_json_spec, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
+    @mock.patch("cephadm.module.SpecStore.find")
+    def test_trigger_deployment_no_todo(self, _find, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
-            _find_json_spec.return_value = []
+            _find.return_value = []
             c = cephadm_module.trigger_deployment('foo', lambda x: x)
-            _find_json_spec.assert_called_with('foo')
+            _find.assert_called_with('foo')
             assert wait(cephadm_module, c[0]) == 'Nothing to do..'
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
index 1fe44aee63030e17ce1ea65fcb42c6b6d00bbf8d..f24a7b36d9540248db44095ba18a35a1612fdb91 100644 (file)
@@ -1577,6 +1577,12 @@ class ServiceSpec(object):
             args.update({k: v})
         return _cls(**args)  # type: ignore
 
+    def service_name(self):
+        n = self.service_type
+        if self.name:
+            n += '.' + self.name
+        return n
+
     def to_json(self):
         return json.dumps(self, default=lambda o: o.__dict__,
                           sort_keys=True, indent=4)