]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/cephadm: leverage service specs
authorJoshua Schmid <jschmid@suse.de>
Wed, 26 Feb 2020 13:26:42 +0000 (14:26 +0100)
committerSage Weil <sage@redhat.com>
Sun, 1 Mar 2020 14:10:54 +0000 (08:10 -0600)
Fixes: https://tracker.ceph.com/issues/44205
This does a couple of things:

* Change the way apply_$service() works:

Instead of triggering the deployment mechanism it will rather
transform the already passed ServiceSpec into a json representation
and save it in a persistent mon_store section.

`mgr/cephadm/service_spec/$service|daemon_type/service_name`

These locations will be periodically checked in the serve() thread.
This works since all the apply_$service_type functions are idempotent.

* Allow to save a config-like specification in the mon_store.

`ceph orch apply -i <service_spec_file.yaml>`

will read the specified services and save them in the mon store
section like mentioned above. The same serve() mechanism like above
also applies to deployment.

Signed-off-by: Joshua Schmid <jschmid@suse.de>
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/tests/test_cephadm.py
src/pybind/mgr/mgr_module.py
src/pybind/mgr/orchestrator/_interface.py
src/pybind/mgr/orchestrator/module.py
src/pybind/mgr/tests/test_orchestrator.py

index ea0a34fa1ff0e22b0ccf7053126498c0851ba0db..551dac3764c1e6d405e3207fa9dd513458fc46c1 100644 (file)
@@ -2,6 +2,7 @@ import json
 import errno
 import logging
 import time
+import yaml
 from threading import Event
 from functools import wraps
 
@@ -9,7 +10,7 @@ from mgr_util import create_self_signed_cert
 
 import string
 try:
-    from typing import List, Dict, Optional, Callable, Tuple, TypeVar, Type, Any, NamedTuple
+    from typing import List, Dict, Optional, Callable, Tuple, TypeVar, Type, Any, NamedTuple, Iterator
     from typing import TYPE_CHECKING
 except ImportError:
     TYPE_CHECKING = False  # just for type checking
@@ -25,16 +26,16 @@ import re
 import shutil
 import subprocess
 import uuid
+from mgr_module import PersistentStoreDict
 
 from ceph.deployment import inventory, translate
 from ceph.deployment.drive_group import DriveGroupSpec
 from ceph.deployment.drive_selection import selector
 
 from mgr_module import MgrModule
-import mgr_util
 import orchestrator
 from orchestrator import OrchestratorError, HostPlacementSpec, OrchestratorValidationError, HostSpec, \
-    CLICommandMeta
+    CLICommandMeta, ServiceSpec
 
 from . import remotes
 from ._utils import RemoveUtil
@@ -585,6 +586,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         self.osd_removal_report: dict = dict()
         self.rm_util = RemoveUtil(self)
 
+        self.service_spec_store = PersistentStoreDict(self, 'service_spec')
+
         # ensure the host lists are in sync
         for h in self.inventory.keys():
             if h not in self.cache.daemons:
@@ -952,6 +955,18 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         while self.run:
             self._check_hosts()
             self._remove_osds_bg()
+            completions = self._apply_services()
+            for completion in completions:
+                if completion:
+                    while not completion.has_result:
+                        self.process([completion])
+                        self.log.debug(f'Still processing {completion}')
+                        if completion.needs_result:
+                            time.sleep(1)
+                        else:
+                            break
+                    if completion.exception is not None:
+                        self.log.error(str(completion.exception))
 
             # refresh daemons
             self.log.debug('refreshing hosts')
@@ -1579,20 +1594,31 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             # ugly sync path, FIXME someday perhaps?
             for host, hi in self.inventory.items():
                 self._refresh_host_daemons(host)
+        # <service_map>
         sm = {}  # type: Dict[str, orchestrator.ServiceDescription]
         for h, dm in self.cache.daemons.items():
             for name, dd in dm.items():
                 if service_type and service_type != dd.daemon_type:
                     continue
-                n = dd.service_name()
+                # <name> i.e. rgw.realm.zone
+                n: str = dd.service_name()
                 if service_name and service_name != n:
                     continue
+                service_store = PersistentStoreDict(self, f'service_spec/{dd.daemon_type}')
+                try:
+                    _ = service_store[dd.service_name(without_service_type=True)]
+                    spec_presence = "present"
+                except IndexError:
+                    spec_presence = "absent"
+                if dd.daemon_type == 'osd':
+                    spec_presence = "not applicable"
                 if n not in sm:
                     sm[n] = orchestrator.ServiceDescription(
                         service_name=n,
                         last_refresh=dd.last_refresh,
                         container_image_id=dd.container_image_id,
                         container_image_name=dd.container_image_name,
+                        spec_presence=spec_presence,
                     )
                 sm[n].size += 1
                 if dd.status == 1:
@@ -1684,6 +1710,11 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             for name in names:
                 if name in dm:
                     args.append((name, host, force))
+                    # TODO: bail out for OSDs when https://github.com/ceph/ceph/pull/32983 is merged
+                    if dm[name].daemon_type == 'osd':
+                        continue
+                    self._remove_key_from_store(dm[name].daemon_type,
+                                                dm[name].service_name(without_service_type=True))
         if not args:
             raise OrchestratorError('Unable to find daemon(s) %s' % (names))
         self.log.info('Remove daemons %s' % [a[0] for a in args])
@@ -1697,6 +1728,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                     args.append(
                         (d.name(), d.hostname, force)
                     )
+                    self._remove_key_from_store(d.daemon_type, d.service_name(without_service_type=True))
         if not args:
             raise OrchestratorError('Unable to find daemons in %s service' % (
                 service_name))
@@ -1704,6 +1736,14 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             service_name, [a[0] for a in args]))
         return self._remove_daemon(args)
 
+    def _remove_key_from_store(self, service_type: str, name: str) -> None:
+        self.log.debug(f"Removing {name} from the service_spec store")
+        try:
+            store = PersistentStoreDict(self, f'service_spec/{service_type}')
+            del store[name]
+        except IndexError:
+            self.log.debug(f"{service_type}/{name} not found in store. Can't remove.")
+
     def get_inventory(self, host_filter=None, refresh=False):
         """
         Return the storage inventory of hosts matching the given filter.
@@ -2123,6 +2163,11 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         return self._add_daemon('mgr', spec, self._create_mgr)
 
     def apply_mgr(self, spec):
+        self.save_spec(spec)
+        self._kick_serve_loop()
+        return trivial_result("scheduled MGR creation..")
+
+    def _apply_mgr(self, spec):
         # type: (orchestrator.ServiceSpec) -> AsyncCompletion
         return self._apply_service('mgr', spec, self._create_mgr)
 
@@ -2130,7 +2175,12 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         # type: (orchestrator.ServiceSpec) -> AsyncCompletion
         return self._add_daemon('mds', spec, self._create_mds, self._config_mds)
 
-    def apply_mds(self, spec):
+    def apply_mds(self, spec: orchestrator.ServiceSpec) -> orchestrator.Completion:
+        self.save_spec(spec)
+        self._kick_serve_loop()
+        return trivial_result("scheduled MDS creation..")
+
+    def _apply_mds(self, spec):
         # type: (orchestrator.ServiceSpec) -> AsyncCompletion
         return self._apply_service('mds', spec, self._create_mds,
                                    self._config_mds)
@@ -2187,6 +2237,11 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         return self._create_daemon('rgw', rgw_id, host, keyring=keyring)
 
     def apply_rgw(self, spec):
+        self.save_spec(spec)
+        self._kick_serve_loop()
+        return trivial_result("scheduled RGW creation..")
+
+    def _apply_rgw(self, spec):
         # type: (orchestrator.ServiceSpec) -> AsyncCompletion
         return self._apply_service('rgw', spec, self._create_rgw,
                                    self._config_rgw)
@@ -2206,6 +2261,11 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                                    keyring=keyring)
 
     def apply_rbd_mirror(self, spec):
+        self.save_spec(spec)
+        self._kick_serve_loop()
+        return trivial_result("scheduled rbd-mirror creation..")
+
+    def _apply_rbd_mirror(self, spec):
         # type: (orchestrator.ServiceSpec) -> AsyncCompletion
         return self._apply_service('rbd-mirror', spec, self._create_rbd_mirror)
 
@@ -2405,16 +2465,26 @@ receivers:
     def _create_prometheus(self, daemon_id, host):
         return self._create_daemon('prometheus', daemon_id, host)
 
-    def apply_prometheus(self, spec):
+    def _apply_prometheus(self, spec):
         # type: (orchestrator.ServiceSpec) -> AsyncCompletion
         return self._apply_service('prometheus', spec, self._create_prometheus)
 
+    def apply_prometheus(self, spec):
+        self.save_spec(spec)
+        self._kick_serve_loop()
+        return trivial_result("scheduled prometheus creation..")
+
     def add_node_exporter(self, spec):
         # type: (orchestrator.ServiceSpec) -> AsyncCompletion
         return self._add_daemon('node-exporter', spec,
                                 self._create_node_exporter)
 
     def apply_node_exporter(self, spec):
+        self.save_spec(spec)
+        self._kick_serve_loop()
+        return trivial_result("scheduled node-exporter creation..")
+
+    def _apply_node_exporter(self, spec):
         # type: (orchestrator.ServiceSpec) -> AsyncCompletion
         return self._apply_service('node-exporter', spec,
                                    self._create_node_exporter)
@@ -2676,6 +2746,136 @@ receivers:
         """
         return trivial_result(self.osd_removal_report)
 
+    def save_spec(self, spec: ServiceSpec) -> None:
+        """
+        Attempts to save a ServiceSpec.
+
+        There are two ways to manipulate the service_specs stored in the mon_store
+        1) Using a global definition (i.e. `ceph orch apply -i <specs_file.yaml>`)
+            This will usually contain multiple definitions of services and daemons.
+
+        2) Using the CLI for specific service types (i.e. `ceph orch apply rgw 3 --realm foo --zone bar`)
+
+         Raises `OrchestratorError` if an entry with the same named identifier already exist to prevent overwrites from two different sources.
+        """
+        store = PersistentStoreDict(self, f'service_spec/{spec.service_type}')
+        name: str = spec.name or spec.service_type
+        try:
+            _ = store[name]
+            raise orchestrator.OrchestratorError(f"Specification for {name} already exists. "
+                                                 "Please review your existing specs with "
+                                                 "`ceph orch servicespecs ls` and try again.")
+        except IndexError:
+            self.log.info(f"New spec found. Saving <{name}> to the store.")
+            store[name] = spec.to_json()
+
+    def list_specs(self) -> orchestrator.Completion:
+        """
+        Loads all entries from the service_spec mon_store root.
+        """
+        specs = list()
+        for _, spec in self.service_spec_store.items():
+            specs.append('---')
+            specs.append(yaml.dump(spec))
+        return trivial_result(specs)
+
+    def clear_all_specs(self) -> orchestrator.Completion:
+        """
+        Clears the service_spec root in the mon_store (mgr/cephadm/service_specs/)
+        """
+        self.service_spec_store.clear()
+        return trivial_result(f"Mon store for {self.service_spec_store.prefix} cleared")
+
+    def apply_service_config(self, spec_document: str) -> orchestrator.Completion:
+        """
+        Parse a multi document yaml file (represented in a inbuf object)
+        and loads it with it's respective ServiceSpec to validate the
+        initial input.
+        If no errors are raised `save_spec` is called.
+        """
+        content: Iterator[Any] = yaml.load_all(spec_document)
+        # Load all specs from a multi document yaml file.
+        for spec in content:
+            # load ServiceSpec once to validate
+            spec_o = ServiceSpec.from_json(spec)
+            self.save_spec(spec_o)
+        self._kick_serve_loop()
+        return trivial_result("ServiceSpecs saved")
+
+    def trigger_deployment(self, service_name: str,
+                           func: Callable[[ServiceSpec], orchestrator.Completion]) -> List[orchestrator.Completion]:
+        """
+        Triggers a corresponding deployment method `func` to `service_name`
+        Services can have multiple entries. (i.e. different RGW configurations)
+        """
+        self.log.debug(f"starting async {service_name} deployment")
+        specs = self.find_json_specs(service_name)
+        completions = list()
+        for spec in specs:
+            completions.append(func(spec))
+        if completions:
+            return completions
+        return [trivial_result("Nothing to do..")]
+
+    def find_json_specs(self, find_service_type: str) -> List[ServiceSpec]:
+        """
+        Inspects the mon_store and gathers entries for the `find_service_type`
+        (i.e. 'mgr', 'rgw') service.
+        Some services have individual ServiceSpecs (rgw->RGWSpec, nfs->NFSServiceSpec)
+        so we need to make the distinction.
+        """
+        specs = list()
+        self.log.debug(f"Checking for type {find_service_type}")
+        store = PersistentStoreDict(self, f'service_spec/{find_service_type}')
+        for service_type, json_specs in store.items():
+
+            if isinstance(json_specs, dict):
+                self.log.debug(f"Found dict in json_specs: No need to decode")
+            elif isinstance(json_specs, str):
+                self.log.debug(f"Found str in json_specs: Decoding from json")
+                json_specs = json.loads(json_specs)
+
+            self.log.debug(f"Found service_type: {service_type} in the k-v store. Adding..")
+            specs.append(ServiceSpec.from_json(json_specs))
+        self.log.debug(f"Found {specs} specs.")
+        return specs
+
+    def _apply_services(self) -> List[orchestrator.Completion]:
+        """
+        This is a method that is supposed to run continuously in the
+        server() thread.
+        It will initiate deployments based on the presence of a ServiceSpec
+        in the persistent mon_store.
+        There is a defined order in which the services should be deployed
+        Defined order:
+        # mon -> mgr -> osd -> monitoring -> mds -> rgw -> nfs -> iscsi -> rbd-mirror
+
+        Special cases:
+        * Mons scaling is currently not implemented.
+        * OSDs are daemons that are handled differently and may not fit in this paradigm
+
+        The serve() thread processes the completions serially, which ensures the adherence to
+        the defined order.
+        """
+
+        super_completions: List[orchestrator.Completion] = list()
+        super_completions.extend(self.trigger_deployment('mgr', self._apply_mgr))
+        super_completions.extend(self.trigger_deployment('prometheus', self._apply_prometheus))
+        super_completions.extend(self.trigger_deployment('node-exporter', self._apply_node_exporter))
+        super_completions.extend(self.trigger_deployment('mds', self._apply_mds))
+        super_completions.extend(self.trigger_deployment('rgw', self._apply_rgw))
+        super_completions.extend(self.trigger_deployment('rbd-mirror', self._apply_rbd_mirror))
+
+        # Not implemented
+
+        # super_completions.extend(trigger_deployment('mon', self._apply_mon))
+        # super_completions.extend(trigger_deployment('nfs', self._apply_nfs))
+        # super_completions.extend(trigger_deployment('grafana', self._apply_grafana))
+        # super_completions.extend(trigger_deployment('iscsi', self._apply_iscsi))
+
+        # Not implemented
+        return super_completions
+
 
 class BaseScheduler(object):
     """
index e1350c516ee43df95a1326c399585bc0d4797563..3976dc1030b761c26e9bfe8cd5ce1dda45289175 100644 (file)
@@ -137,12 +137,12 @@ class TestCephadm(object):
     def test_mon_update(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
-            c = cephadm_module.add_mon(ServiceSpec(placement=ps))
+            c = cephadm_module.add_mon(ServiceSpec(placement=ps, service_type='mon'))
             assert wait(cephadm_module, c) == ["Deployed mon.a on host 'test'"]
 
             with pytest.raises(OrchestratorError, match="is missing a network spec"):
                 ps = PlacementSpec(hosts=['test'], count=1)
-                c = cephadm_module.add_mon(ServiceSpec(placement=ps))
+                c = cephadm_module.add_mon(ServiceSpec(placement=ps, service_type='mon'))
                 wait(cephadm_module, c)
 
     @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
@@ -154,7 +154,7 @@ class TestCephadm(object):
     def test_mgr_update(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
-            c = cephadm_module.apply_mgr(ServiceSpec(placement=ps))
+            c = cephadm_module._apply_mgr(ServiceSpec(placement=ps, service_type='mgr'))
             [out] = wait(cephadm_module, c)
             match_glob(out, "Deployed mgr.* on host 'test'")
 
@@ -202,7 +202,7 @@ class TestCephadm(object):
     def test_mds(self, _send_command, _get_connection, _save_host, _rm_host, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test'], count=1)
-            c = cephadm_module.add_mds(ServiceSpec('name', placement=ps))
+            c = cephadm_module.add_mds(ServiceSpec('name', placement=ps, service_type='mds'))
             [out] = wait(cephadm_module, c)
             match_glob(out, "Deployed mds.name.* on host 'test'")
 
@@ -216,7 +216,7 @@ class TestCephadm(object):
 
         with self._with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test'], count=1)
-            c = cephadm_module.add_rgw(RGWSpec('realm', 'zone', placement=ps))
+            c = cephadm_module.add_rgw(RGWSpec('realm', 'zone', placement=ps, service_type='rgw'))
             [out] = wait(cephadm_module, c)
             match_glob(out, "Deployed rgw.realm.zone.* on host 'test'")
 
@@ -232,12 +232,12 @@ class TestCephadm(object):
         with self._with_host(cephadm_module, 'host1'):
             with self._with_host(cephadm_module, 'host2'):
                 ps = PlacementSpec(hosts=['host1'], count=1)
-                c = cephadm_module.add_rgw(RGWSpec('realm', 'zone1', placement=ps))
+                c = cephadm_module.add_rgw(RGWSpec('realm', 'zone1', placement=ps, service_type='rgw'))
                 [out] = wait(cephadm_module, c)
                 match_glob(out, "Deployed rgw.realm.zone1.host1.* on host 'host1'")
 
                 ps = PlacementSpec(hosts=['host1', 'host2'], count=2)
-                c = cephadm_module.apply_rgw(RGWSpec('realm', 'zone1', placement=ps))
+                c = cephadm_module._apply_rgw(RGWSpec('realm', 'zone1', placement=ps, service_type='rgw'))
                 [out] = wait(cephadm_module, c)
                 match_glob(out, "Deployed rgw.realm.zone1.host2.* on host 'host2'")
 
@@ -252,12 +252,12 @@ class TestCephadm(object):
         with self._with_host(cephadm_module, 'host1'):
             with self._with_host(cephadm_module, 'host2'):
                 ps = PlacementSpec(hosts=['host1'], count=1)
-                c = cephadm_module.add_rgw(RGWSpec('realm', 'zone1', placement=ps))
+                c = cephadm_module.add_rgw(RGWSpec('realm', 'zone1', placement=ps, service_type='rgw'))
                 [out] = wait(cephadm_module, c)
                 match_glob(out, "Deployed rgw.realm.zone1.host1.* on host 'host1'")
 
                 ps = PlacementSpec(hosts=['host2'], count=1)
-                c = cephadm_module.add_rgw(RGWSpec('realm', 'zone2', placement=ps))
+                c = cephadm_module.add_rgw(RGWSpec('realm', 'zone2', placement=ps, service_type='rgw'))
                 [out] = wait(cephadm_module, c)
                 match_glob(out, "Deployed rgw.realm.zone2.host2.* on host 'host2'")
 
@@ -267,7 +267,7 @@ class TestCephadm(object):
 
                 with pytest.raises(OrchestratorError):
                     ps = PlacementSpec(hosts=['host1', 'host2'], count=3)
-                    c = cephadm_module.apply_rgw(RGWSpec('realm', 'zone1', placement=ps))
+                    c = cephadm_module._apply_rgw(RGWSpec('realm', 'zone1', placement=ps, service_type='rgw'))
                     [out] = wait(cephadm_module, c)
 
 
@@ -285,7 +285,8 @@ class TestCephadm(object):
     ))
     @mock.patch("cephadm.module.HostCache.save_host")
     @mock.patch("cephadm.module.HostCache.rm_host")
-    def test_remove_daemon(self, _save_host, _rm_host, cephadm_module):
+    @mock.patch("cephadm.module.CephadmOrchestrator._remove_key_from_store")
+    def test_remove_daemon(self, _save_host, _rm_host, _save_spec, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             c = cephadm_module.list_daemons(refresh=True)
             wait(cephadm_module, c)
@@ -307,7 +308,8 @@ class TestCephadm(object):
     ))
     @mock.patch("cephadm.module.HostCache.save_host")
     @mock.patch("cephadm.module.HostCache.rm_host")
-    def test_remove_service(self, _save_host, _rm_host, cephadm_module):
+    @mock.patch("cephadm.module.CephadmOrchestrator._remove_key_from_store")
+    def test_remove_service(self, _save_host, _rm_host, _save_spec, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             c = cephadm_module.list_daemons(refresh=True)
             wait(cephadm_module, c)
@@ -325,7 +327,7 @@ class TestCephadm(object):
         # type: (mock.Mock, mock.Mock, mock.Mock, mock.Mock, CephadmOrchestrator) -> None
         with self._with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test'], count=1)
-            c = cephadm_module.add_rbd_mirror(ServiceSpec(name='name', placement=ps))
+            c = cephadm_module.add_rbd_mirror(ServiceSpec(name='name', placement=ps, service_type='rbd-mirror'))
             [out] = wait(cephadm_module, c)
             match_glob(out, "Deployed rbd-mirror.* on host 'test'")
 
@@ -340,7 +342,7 @@ class TestCephadm(object):
         with self._with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test'], count=1)
 
-            c = cephadm_module.add_prometheus(ServiceSpec(placement=ps))
+            c = cephadm_module.add_prometheus(ServiceSpec(placement=ps, service_type='prometheus'))
             [out] = wait(cephadm_module, c)
             match_glob(out, "Deployed prometheus.* on host 'test'")
 
@@ -355,7 +357,7 @@ class TestCephadm(object):
         with self._with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test'], count=1)
 
-            c = cephadm_module.add_node_exporter(ServiceSpec(placement=ps))
+            c = cephadm_module.add_node_exporter(ServiceSpec(placement=ps, service_type='node-exporter'))
             [out] = wait(cephadm_module, c)
             match_glob(out, "Deployed node-exporter.* on host 'test'")
 
@@ -370,7 +372,7 @@ class TestCephadm(object):
         with self._with_host(cephadm_module, 'test'):
             ps = PlacementSpec(hosts=['test'], count=1)
 
-            c = cephadm_module.add_grafana(ServiceSpec(placement=ps))
+            c = cephadm_module.add_grafana(ServiceSpec(placement=ps, service_type='grafana'))
             [out] = wait(cephadm_module, c)
             match_glob(out, "Deployed grafana.* on host 'test'")
 
index 2b62d0672b40f37fcb981a0b60f95f6de511f639..2d33651cd87eff4e01fa2b33db6e34c0167bf2c8 100644 (file)
@@ -1562,7 +1562,7 @@ class PersistentStoreDict(object):
     def __init__(self, mgr, prefix):
         # type: (MgrModule, str) -> None
         self.mgr = mgr
-        self.prefix = prefix + '.'
+        self.prefix = prefix + '/'
 
     def _mk_store_key(self, key):
         return self.prefix + key
@@ -1587,7 +1587,7 @@ class PersistentStoreDict(object):
                 self.__missing__(key)
             return json.loads(val)
         except (KeyError, AttributeError, IndexError, ValueError, TypeError):
-            logging.getLogger(__name__).exception('failed to deserialize')
+            logging.getLogger(__name__).debug('failed to deserialize item in store')
             self.mgr.set_store(key, None)
             raise
 
index 7e79329453d7b733b4597050b732c66a96c186b0..b5bc21cbe90fbe6c4ea81cd71725e809f2d3eb1e 100644 (file)
@@ -8,6 +8,7 @@ import copy
 import functools
 import logging
 import pickle
+import json
 import sys
 import time
 from collections import namedtuple
@@ -913,6 +914,12 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
+    def apply_service_config(self, spec_document: str) -> Completion:
+        """
+        Saves Service Specs from a yaml|json file
+        """
+        raise NotImplementedError()
+
     def remove_daemons(self, names, force):
         # type: (List[str], bool) -> Completion
         """
@@ -922,6 +929,20 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
+    def list_specs(self):
+        # type: () -> Completion
+        """
+        Lists saved service specs
+        """
+        raise NotImplementedError()
+
+    def clear_all_specs(self):
+        # type: () -> Completion
+        """
+        Lists saved service specs
+        """
+        raise NotImplementedError()
+
     def remove_service(self, service_name):
         # type: (str) -> Completion
         """
@@ -1222,7 +1243,6 @@ class PlacementSpec(object):
             else:
                 self.hosts = [HostPlacementSpec.parse(x, require_network=False) for x in hosts if x]
 
-
         self.count = count  # type: Optional[int]
         self.all_hosts = all_hosts  # type: bool
 
@@ -1384,12 +1404,18 @@ class DaemonDescription(object):
             return self.name().startswith(service_name + '.')
         return False
 
-    def service_name(self):
+    def service_name(self, without_service_type=False):
         if self.daemon_type == 'rgw':
             v = self.daemon_id.split('.')
-            return 'rgw.%s' % ('.'.join(v[0:2]))
+            s_name = '.'.join(v[0:2])
+            if without_service_type:
+                return s_name
+            return 'rgw.%s' % s_name
         if self.daemon_type in ['mds', 'nfs']:
-            return 'mds.%s' % (self.daemon_id.split('.')[0])
+            _s_name = self.daemon_id.split('.')[0]
+            if without_service_type:
+                return _s_name
+            return 'mds.%s' % _s_name
         return self.daemon_type
 
     def __repr__(self):
@@ -1442,6 +1468,7 @@ class ServiceDescription(object):
                  service_url=None,
                  last_refresh=None,
                  size=0,
+                 spec_presence='absent',
                  running=0):
         # Not everyone runs in containers, but enough people do to
         # justify having the container_image_id (image hash) and container_image
@@ -1470,6 +1497,11 @@ class ServiceDescription(object):
         # datetime when this info was last refreshed
         self.last_refresh = last_refresh   # type: Optional[datetime.datetime]
 
+        # status string to indicate the presence of a persistent servicespec
+        # possible strings are "absent", "present" and "not applicable" while
+        # the "not applicable" is mostly for OSDs.
+        self.spec_presence = spec_presence
+
     def service_type(self):
         if self.service_name:
             return self.service_name.split('.')[0]
@@ -1514,8 +1546,8 @@ class ServiceSpec(object):
 
     """
 
-    def __init__(self, name=None, placement=None):
-        # type: (Optional[str], Optional[PlacementSpec]) -> None
+    def __init__(self, name=None, placement=None, service_type=None, count=None):
+        # type: (Optional[str], Optional[PlacementSpec], Optional[str], Optional[int]) -> None
         self.placement = PlacementSpec() if placement is None else placement  # type: PlacementSpec
 
         #: Give this set of stateless services a name: typically it would
@@ -1523,12 +1555,46 @@ class ServiceSpec(object):
         #: within one ceph cluster. Note: Not all clusters have a name
         self.name = name  # type: Optional[str]
 
+        assert service_type
+        self.service_type = service_type
+
         if self.placement is not None and self.placement.count is not None:
             #: Count of service instances. Deprecated.
             self.count = self.placement.count  # type: int
         else:
             self.count = 1
 
+    @classmethod
+    def from_json(cls, json_spec: dict) -> "ServiceSpec":
+        """
+        Initialize 'ServiceSpec' object data from a json structure
+        :param json_spec: A valid dict with ServiceSpec
+        """
+        args: Dict[str, Dict[Any, Any]] = {}
+        service_type = json_spec.get('service_type', '')
+        assert service_type
+        if service_type == 'rgw':
+            _cls = RGWSpec  # type: ignore
+        elif service_type == 'nfs':
+            _cls = NFSServiceSpec  # type: ignore
+        else:
+            _cls = ServiceSpec  # type: ignore
+        for k, v in json_spec.items():
+            if k == 'placement':
+                v = PlacementSpec.from_dict(v)
+            if k == 'spec':
+                args.update(v)
+                continue
+            args.update({k: v})
+        return _cls(**args)  # type: ignore
+
+    def to_json(self):
+        return json.dumps(self, default=lambda o: o.__dict__,
+                          sort_keys=True, indent=4)
+
+    def __repr__(self):
+        return "{}({!r})".format(self.__class__.__name__, self.__dict__)
+
 
 def servicespec_validate_add(self: ServiceSpec):
     # This must not be a method of ServiceSpec, otherwise you'll hunt
@@ -1552,10 +1618,9 @@ def servicespec_validate_hosts_have_network_spec(self: ServiceSpec):
             raise OrchestratorValidationError(m)
 
 
-
 class NFSServiceSpec(ServiceSpec):
-    def __init__(self, name, pool=None, namespace=None, placement=None):
-        super(NFSServiceSpec, self).__init__(name, placement)
+    def __init__(self, name, pool=None, namespace=None, placement=None, service_type=None):
+        super(NFSServiceSpec, self).__init__(name=name, placement=placement, service_type=service_type)
 
         #: RADOS pool where NFS client recovery data is stored.
         self.pool = pool
@@ -1579,6 +1644,8 @@ class RGWSpec(ServiceSpec):
                  rgw_realm,  # type: str
                  rgw_zone,  # type: str
                  placement=None,
+                 service_type=None,  # type: Optional[str]
+                 name=None,  # type: Optional[str]
                  hosts=None,  # type: Optional[List[str]]
                  rgw_multisite=None,  # type: Optional[bool]
                  rgw_zonemaster=None,  # type: Optional[bool]
@@ -1596,7 +1663,7 @@ class RGWSpec(ServiceSpec):
         # in Rook itself. Thus we don't set any defaults here in this class.
 
         super(RGWSpec, self).__init__(name=rgw_realm + '.' + rgw_zone,
-                                      placement=placement)
+                                      placement=placement, service_type=service_type)
 
         #: List of hosts where RGWs should run. Not for Rook.
         if hosts:
@@ -1625,8 +1692,8 @@ class RGWSpec(ServiceSpec):
     @property
     def rgw_multisite_endpoints_list(self):
         return ",".join(["{}://{}:{}".format(self.rgw_multisite_proto,
-                             host,
-                             self.rgw_frontend_port) for host in self.placement.hosts])
+                                             host,
+                                             self.rgw_frontend_port) for host in self.placement.hosts])
 
     def genkey(self, nchars):
         """ Returns a random string of nchars
@@ -1639,17 +1706,6 @@ class RGWSpec(ServiceSpec):
                                      string.ascii_lowercase +
                                      string.digits) for _ in range(nchars))
 
-    @classmethod
-    def from_json(cls, json_rgw_spec):
-        # type: (dict) -> RGWSpec
-        """
-        Initialize 'RGWSpec' object data from a json structure
-        :param json_rgw_spec: A valid dict with a the RGW settings
-        """
-        # TODO: also add PlacementSpec(**json_rgw_spec['placement'])
-        args = {k:v for k, v in json_rgw_spec.items()}
-        return RGWSpec(**args)
-
 
 class InventoryFilter(object):
     """
index dfdcc8b8947d415a87b78aaf23a5c3499f7f998a..738966c177aa200f2425d2a87fd78ddba22f7a49 100644 (file)
@@ -335,7 +335,7 @@ class OrchestratorCli(OrchestratorClientMixin, MgrModule):
         else:
             now = datetime.datetime.utcnow()
             table = PrettyTable(
-                ['NAME', 'RUNNING', 'REFRESHED', 'IMAGE NAME', 'IMAGE ID'],
+                ['NAME', 'RUNNING', 'REFRESHED', 'IMAGE NAME', 'IMAGE ID', 'SPEC'],
                 border=False)
             table.align['NAME'] = 'l'
             table.align['RUNNING'] = 'r'
@@ -354,7 +354,9 @@ class OrchestratorCli(OrchestratorClientMixin, MgrModule):
                     '%d/%d' % (s.running, s.size),
                     age,
                     ukn(s.container_image_name),
-                    ukn(s.container_image_id)[0:12]))
+                    ukn(s.container_image_id)[0:12],
+                    ukn(s.spec_presence)
+                ))
 
             return HandleCommandResult(stdout=table.get_string())
 
@@ -507,7 +509,7 @@ Usage:
         placement = PlacementSpec(label=label, count=num, hosts=hosts)
         placement.validate()
 
-        spec = ServiceSpec(placement=placement)
+        spec = ServiceSpec(placement=placement, service_type='mon')
 
         completion = self.add_mon(spec)
         self._orchestrator_wait([completion])
@@ -527,6 +529,15 @@ Usage:
         raise_if_exception(completion)
         return HandleCommandResult(stdout=completion.result_str())
 
+    @_cli_write_command(
+        'orch apply',
+        desc='Applies a Service Specification from a file. ceph orch apply -i $file')
+    def _apply_services(self, inbuf):
+        completion = self.apply_service_config(inbuf)
+        self._orchestrator_wait([completion])
+        raise_if_exception(completion)
+        return HandleCommandResult(stdout=completion.result_str())
+
     @_cli_write_command(
         'orch daemon add rbd-mirror',
         "name=num,type=CephInt,req=false "
@@ -718,6 +729,26 @@ Usage:
         raise_if_exception(completion)
         return HandleCommandResult(stdout=completion.result_str())
 
+    @_cli_write_command(
+        'orch spec dump',
+        desc='List all Service specs')
+    def _get_service_specs(self):
+        completion = self.list_specs()
+        self._orchestrator_wait([completion])
+        raise_if_exception(completion)
+        specs = completion.result_str()
+        return HandleCommandResult(stdout=specs)
+
+    @_cli_write_command(
+        'orch servicespecs clear',
+        desc='Clear all Service specs')
+    def _clear_service_specs(self):
+        completion = self.clear_all_specs()
+        self._orchestrator_wait([completion])
+        raise_if_exception(completion)
+        return HandleCommandResult(stdout=completion.result_str())
+
+
     @_cli_write_command(
         'orch apply mgr',
         "name=num,type=CephInt,req=false "
@@ -729,7 +760,7 @@ Usage:
             label=label, count=num, hosts=hosts)
         placement.validate()
 
-        spec = ServiceSpec(placement=placement)
+        spec = ServiceSpec(placement=placement, service_type='mgr')
 
         completion = self.apply_mgr(spec)
         self._orchestrator_wait([completion])
@@ -766,11 +797,10 @@ Usage:
     def _apply_mds(self, fs_name, num=None, label=None, hosts=[]):
         placement = PlacementSpec(label=label, count=num, hosts=hosts)
         placement.validate()
-
         spec = ServiceSpec(
-            fs_name,
+            service_type='mds',
+            name=fs_name,
             placement=placement)
-
         completion = self.apply_mds(spec)
         self._orchestrator_wait([completion])
         raise_if_exception(completion)
@@ -784,7 +814,8 @@ Usage:
         'Update the number of rbd-mirror instances')
     def _apply_rbd_mirror(self, num, label=None, hosts=[]):
         spec = ServiceSpec(
-            placement=PlacementSpec(hosts=hosts, count=num, label=label))
+            placement=PlacementSpec(hosts=hosts, count=num, label=label),
+            service_type='rbd-mirror')
         completion = self.apply_rbd_mirror(spec)
         self._orchestrator_wait([completion])
         raise_if_exception(completion)
@@ -800,6 +831,7 @@ Usage:
         'Update the number of RGW instances for the given zone')
     def _apply_rgw(self, zone_name, realm_name, num=None, label=None, hosts=[]):
         spec = RGWSpec(
+            service_type='rgw',
             rgw_realm=realm_name,
             rgw_zone=zone_name,
             placement=PlacementSpec(hosts=hosts, label=label, count=num))
@@ -835,6 +867,7 @@ Usage:
         # type: (Optional[int], Optional[str], List[str]) -> HandleCommandResult
         spec = ServiceSpec(
             placement=PlacementSpec(label=label, hosts=hosts, count=num),
+            service_type='prometheus'
         )
         completion = self.apply_prometheus(spec)
         self._orchestrator_wait([completion])
@@ -850,6 +883,7 @@ Usage:
         # type: (Optional[int], Optional[str], List[str]) -> HandleCommandResult
         spec = ServiceSpec(
             placement=PlacementSpec(label=label, hosts=hosts, count=num),
+            service_type='node-exporter'
         )
         completion = self.apply_node_exporter(spec)
         self._orchestrator_wait([completion])
index 3b51636041574bf53064a9631a3f92aacad47f6e..3fa5fc8d53fe36f8795817cc10716ef1e3533bb3 100644 (file)
@@ -117,6 +117,7 @@ def test_rgwspec():
     """
     {
         "rgw_zone": "zonename",
+        "service_type": "rgw",
         "rgw_frontend_port": 8080,
         "rgw_zonegroup": "group",
         "rgw_zone_user": "user",