]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: Adapt cephadm to new orch interface
authorSebastian Wagner <sebastian.wagner@suse.com>
Mon, 8 Feb 2021 00:37:07 +0000 (01:37 +0100)
committerSebastian Wagner <sebastian.wagner@suse.com>
Mon, 1 Mar 2021 15:48:54 +0000 (16:48 +0100)
Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/tests/fixtures.py
src/pybind/mgr/cephadm/tests/test_completion.py
src/pybind/mgr/orchestrator/__init__.py

index afd9314dead8ca0af4d7492ce2beaa298bffb2b6..615179a2b9347a36be73766a860431fdc218b281 100644 (file)
@@ -34,7 +34,7 @@ from mgr_util import create_self_signed_cert
 import secrets
 import orchestrator
 from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
-    CLICommandMeta, DaemonDescription, DaemonDescriptionStatus
+    CLICommandMeta, DaemonDescription, DaemonDescriptionStatus, handle_orch_error
 from orchestrator._interface import GenericSpec
 from orchestrator._interface import daemon_type_to_service
 
@@ -95,24 +95,6 @@ DEFAULT_ALERT_MANAGER_IMAGE = 'docker.io/prom/alertmanager:v0.20.0'
 # ------------------------------------------------------------------------------
 
 
-class CephadmCompletion(orchestrator.Completion[T]):
-    def evaluate(self) -> None:
-        self.finalize(None)
-
-
-def trivial_completion(f: Callable[..., T]) -> Callable[..., CephadmCompletion[T]]:
-    """
-    Decorator to make CephadmCompletion methods return
-    a completion object that executes themselves.
-    """
-
-    @wraps(f)
-    def wrapper(*args: Any, **kwargs: Any) -> CephadmCompletion:
-        return CephadmCompletion(on_complete=lambda _: f(*args, **kwargs))
-
-    return wrapper
-
-
 def service_inactive(spec_name: str) -> Callable:
     def inner(func: Callable) -> Callable:
         @wraps(func)
@@ -385,8 +367,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
 
         self.health_checks: Dict[str, dict] = {}
 
-        self.all_progress_references = list()  # type: List[orchestrator.ProgressReference]
-
         self.inventory = Inventory(self)
 
         self.cache = HostCache(self)
@@ -695,17 +675,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
 
         return True, err, ret
 
-    def process(self, completions: List[CephadmCompletion]) -> None:  # type: ignore
-        """
-        Does nothing, as completions are processed in another thread.
-        """
-        if completions:
-            self.log.debug("process: completions={0}".format(
-                orchestrator.pretty_print(completions)))
-
-            for p in completions:
-                p.evaluate()
-
     @orchestrator._cli_write_command(
         prefix='cephadm set-ssh-config')
     def _set_ssh_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
@@ -1213,11 +1182,11 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         self.log.info('Added host %s' % spec.hostname)
         return "Added host '{}'".format(spec.hostname)
 
-    @trivial_completion
+    @handle_orch_error
     def add_host(self, spec: HostSpec) -> str:
         return self._add_host(spec)
 
-    @trivial_completion
+    @handle_orch_error
     def remove_host(self, host):
         # type: (str) -> str
         """
@@ -1232,7 +1201,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         self.log.info('Removed host %s' % host)
         return "Removed host '{}'".format(host)
 
-    @trivial_completion
+    @handle_orch_error
     def update_host_addr(self, host: str, addr: str) -> str:
         self.inventory.set_addr(host, addr)
         self._reset_con(host)
@@ -1240,7 +1209,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         self.log.info('Set host %s addr to %s' % (host, addr))
         return "Updated host '{}' addr to '{}'".format(host, addr)
 
-    @trivial_completion
+    @handle_orch_error
     def get_hosts(self):
         # type: () -> List[orchestrator.HostSpec]
         """
@@ -1251,13 +1220,13 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         """
         return list(self.inventory.all_specs())
 
-    @trivial_completion
+    @handle_orch_error
     def add_host_label(self, host: str, label: str) -> str:
         self.inventory.add_label(host, label)
         self.log.info('Added label %s to host %s' % (label, host))
         return 'Added label %s to host %s' % (label, host)
 
-    @trivial_completion
+    @handle_orch_error
     def remove_host_label(self, host: str, label: str) -> str:
         self.inventory.rm_label(host, label)
         self.log.info('Removed label %s to host %s' % (label, host))
@@ -1298,7 +1267,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
                        + 'Note the following:\n\n' + '\n'.join(notifications))
         return 0, f'It is presumed safe to stop host {hostname}'
 
-    @trivial_completion
+    @handle_orch_error
     def host_ok_to_stop(self, hostname: str) -> str:
         if hostname not in self.cache.get_hosts():
             raise OrchestratorError(f'Cannot find host "{hostname}"')
@@ -1325,7 +1294,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             }
         self.set_health_checks(self.health_checks)
 
-    @trivial_completion
+    @handle_orch_error
     @host_exists()
     def enter_host_maintenance(self, hostname: str, force: bool = False) -> str:
         """ Attempt to place a cluster host in maintenance
@@ -1395,7 +1364,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
 
         return f"Ceph cluster {self._cluster_fsid} on {hostname} moved to maintenance"
 
-    @trivial_completion
+    @handle_orch_error
     @host_exists()
     def exit_host_maintenance(self, hostname: str) -> str:
         """Exit maintenance mode and return a host to an operational state
@@ -1464,7 +1433,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
 
         self._kick_serve_loop()
 
-    @trivial_completion
+    @handle_orch_error
     def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None,
                          refresh: bool = False) -> List[orchestrator.ServiceDescription]:
         if refresh:
@@ -1564,7 +1533,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
                 sm[n].size = sm[n].size * 2
         return list(sm.values())
 
-    @trivial_completion
+    @handle_orch_error
     def list_daemons(self,
                      service_name: Optional[str] = None,
                      daemon_type: Optional[str] = None,
@@ -1589,7 +1558,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
                 result.append(dd)
         return result
 
-    @trivial_completion
+    @handle_orch_error
     def service_action(self, action: str, service_name: str) -> List[str]:
         dds: List[DaemonDescription] = self.cache.get_daemons_by_service(service_name)
         if not dds:
@@ -1661,7 +1630,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
                 'who': utils.name_to_config_section(daemon_type + '.' + daemon_id),
             })
 
-    @trivial_completion
+    @handle_orch_error
     def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> str:
         d = self.cache.get_daemon(daemon_name)
         assert d.daemon_type is not None
@@ -1694,7 +1663,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         self._kick_serve_loop()
         return msg
 
-    @trivial_completion
+    @handle_orch_error
     def remove_daemons(self, names):
         # type: (List[str]) -> List[str]
         args = []
@@ -1707,7 +1676,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         self.log.info('Remove daemons %s' % [a[0] for a in args])
         return self._remove_daemons(args)
 
-    @trivial_completion
+    @handle_orch_error
     def remove_service(self, service_name: str) -> str:
         self.log.info('Remove service %s' % service_name)
         self._trigger_preview_refresh(service_name=service_name)
@@ -1723,7 +1692,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             # must be idempotent: still a success.
             return f'Failed to remove service. <{service_name}> was not found.'
 
-    @trivial_completion
+    @handle_orch_error
     def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
         """
         Return the storage inventory of hosts matching the given filter.
@@ -1752,7 +1721,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
                                                      inventory.Devices(dls)))
         return result
 
-    @trivial_completion
+    @handle_orch_error
     def zap_device(self, host: str, path: str) -> str:
         self.log.info('Zap device %s:%s' % (host, path))
         out, err, code = CephadmServe(self)._run_cephadm(
@@ -1764,7 +1733,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
         return '\n'.join(out + err)
 
-    @trivial_completion
+    @handle_orch_error
     def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]:
         """
         Blink a device light. Calling something like::
@@ -1843,7 +1812,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             self.log.info(f"Marking host: {host} for OSDSpec preview refresh.")
             self.cache.osdspec_previews_refresh_queue.append(host)
 
-    @trivial_completion
+    @handle_orch_error
     def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> List[str]:
         """
         Deprecated. Please use `apply()` instead.
@@ -1852,7 +1821,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         """
         return [self._apply(spec) for spec in specs]
 
-    @trivial_completion
+    @handle_orch_error
     def create_osds(self, drive_group: DriveGroupSpec) -> str:
         return self.osd_service.create_from_spec(drive_group)
 
@@ -1974,16 +1943,16 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
 
         return create_func_map(args)
 
-    @trivial_completion
+    @handle_orch_error
     def apply_mon(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
-    @trivial_completion
+    @handle_orch_error
     def add_mon(self, spec):
         # type: (ServiceSpec) -> List[str]
         return self._add_daemon('mon', spec, self.mon_service.prepare_create)
 
-    @trivial_completion
+    @handle_orch_error
     def add_mgr(self, spec):
         # type: (ServiceSpec) -> List[str]
         return self._add_daemon('mgr', spec, self.mgr_service.prepare_create)
@@ -2023,7 +1992,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             'remove': [d.hostname for d in remove_daemon_hosts]
         }
 
-    @trivial_completion
+    @handle_orch_error
     def plan(self, specs: Sequence[GenericSpec]) -> List:
         results = [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n'
                                'to the current inventory setup. If any on these conditions changes, the \n'
@@ -2074,59 +2043,59 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         self._kick_serve_loop()
         return "Scheduled %s update..." % spec.service_name()
 
-    @trivial_completion
+    @handle_orch_error
     def apply(self, specs: Sequence[GenericSpec]) -> List[str]:
         results = []
         for spec in specs:
             results.append(self._apply(spec))
         return results
 
-    @trivial_completion
+    @handle_orch_error
     def apply_mgr(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
-    @trivial_completion
+    @handle_orch_error
     def add_mds(self, spec: ServiceSpec) -> List[str]:
         return self._add_daemon('mds', spec, self.mds_service.prepare_create)
 
-    @trivial_completion
+    @handle_orch_error
     def apply_mds(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
-    @trivial_completion
+    @handle_orch_error
     def add_rgw(self, spec: ServiceSpec) -> List[str]:
         return self._add_daemon('rgw', spec, self.rgw_service.prepare_create)
 
-    @trivial_completion
+    @handle_orch_error
     def apply_rgw(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
-    @trivial_completion
+    @handle_orch_error
     def apply_ha_rgw(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
-    @trivial_completion
+    @handle_orch_error
     def add_iscsi(self, spec):
         # type: (ServiceSpec) -> List[str]
         return self._add_daemon('iscsi', spec, self.iscsi_service.prepare_create)
 
-    @trivial_completion
+    @handle_orch_error
     def apply_iscsi(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
-    @trivial_completion
+    @handle_orch_error
     def add_rbd_mirror(self, spec: ServiceSpec) -> List[str]:
         return self._add_daemon('rbd-mirror', spec, self.rbd_mirror_service.prepare_create)
 
-    @trivial_completion
+    @handle_orch_error
     def apply_rbd_mirror(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
-    @trivial_completion
+    @handle_orch_error
     def add_nfs(self, spec: ServiceSpec) -> List[str]:
         return self._add_daemon('nfs', spec, self.nfs_service.prepare_create)
 
-    @trivial_completion
+    @handle_orch_error
     def apply_nfs(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
@@ -2134,73 +2103,73 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         # type: () -> str
         return self.get('mgr_map').get('services', {}).get('dashboard', '')
 
-    @trivial_completion
+    @handle_orch_error
     def add_prometheus(self, spec: ServiceSpec) -> List[str]:
         return self._add_daemon('prometheus', spec, self.prometheus_service.prepare_create)
 
-    @trivial_completion
+    @handle_orch_error
     def apply_prometheus(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
-    @trivial_completion
+    @handle_orch_error
     def add_node_exporter(self, spec):
         # type: (ServiceSpec) -> List[str]
         return self._add_daemon('node-exporter', spec,
                                 self.node_exporter_service.prepare_create)
 
-    @trivial_completion
+    @handle_orch_error
     def apply_node_exporter(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
-    @trivial_completion
+    @handle_orch_error
     def add_crash(self, spec):
         # type: (ServiceSpec) -> List[str]
         return self._add_daemon('crash', spec,
                                 self.crash_service.prepare_create)
 
-    @trivial_completion
+    @handle_orch_error
     def apply_crash(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
-    @trivial_completion
+    @handle_orch_error
     def add_grafana(self, spec):
         # type: (ServiceSpec) -> List[str]
         return self._add_daemon('grafana', spec, self.grafana_service.prepare_create)
 
-    @trivial_completion
+    @handle_orch_error
     def apply_grafana(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
-    @trivial_completion
+    @handle_orch_error
     def add_alertmanager(self, spec):
         # type: (ServiceSpec) -> List[str]
         return self._add_daemon('alertmanager', spec, self.alertmanager_service.prepare_create)
 
-    @trivial_completion
+    @handle_orch_error
     def apply_alertmanager(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
-    @trivial_completion
+    @handle_orch_error
     def add_container(self, spec: ServiceSpec) -> List[str]:
         return self._add_daemon('container', spec,
                                 self.container_service.prepare_create)
 
-    @trivial_completion
+    @handle_orch_error
     def apply_container(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
-    @trivial_completion
+    @handle_orch_error
     def add_cephadm_exporter(self, spec):
         # type: (ServiceSpec) -> List[str]
         return self._add_daemon('cephadm-exporter',
                                 spec,
                                 self.cephadm_exporter_service.prepare_create)
 
-    @trivial_completion
+    @handle_orch_error
     def apply_cephadm_exporter(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
-    @trivial_completion
+    @handle_orch_error
     def upgrade_check(self, image: str, version: str) -> str:
         if self.inventory.get_host_with_state("maintenance"):
             raise OrchestratorError("check aborted - you have hosts in maintenance state")
@@ -2237,29 +2206,29 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
 
         return json.dumps(r, indent=4, sort_keys=True)
 
-    @trivial_completion
+    @handle_orch_error
     def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
         return self.upgrade.upgrade_status()
 
-    @trivial_completion
+    @handle_orch_error
     def upgrade_start(self, image: str, version: str) -> str:
         if self.inventory.get_host_with_state("maintenance"):
             raise OrchestratorError("upgrade aborted - you have host(s) in maintenance state")
         return self.upgrade.upgrade_start(image, version)
 
-    @trivial_completion
+    @handle_orch_error
     def upgrade_pause(self) -> str:
         return self.upgrade.upgrade_pause()
 
-    @trivial_completion
+    @handle_orch_error
     def upgrade_resume(self) -> str:
         return self.upgrade.upgrade_resume()
 
-    @trivial_completion
+    @handle_orch_error
     def upgrade_stop(self) -> str:
         return self.upgrade.upgrade_stop()
 
-    @trivial_completion
+    @handle_orch_error
     def remove_osds(self, osd_ids: List[str],
                     replace: bool = False,
                     force: bool = False) -> str:
@@ -2293,7 +2262,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         self._kick_serve_loop()
         return "Scheduled OSD(s) for removal"
 
-    @trivial_completion
+    @handle_orch_error
     def stop_remove_osds(self, osd_ids: List[str]) -> str:
         """
         Stops a `removal` process for a List of OSDs.
@@ -2310,7 +2279,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         self._kick_serve_loop()
         return "Stopped OSD(s) removal"
 
-    @trivial_completion
+    @handle_orch_error
     def remove_osds_status(self) -> List[OSD]:
         """
         The CLI call to retrieve an osd removal report
index 8ded190f31126d9420ca5408c7689c80912df8de..f3d27b352534bc31d6c00fbbdcd61dc19d47da55 100644 (file)
@@ -1,4 +1,3 @@
-import time
 import fnmatch
 from contextlib import contextmanager
 
@@ -12,7 +11,7 @@ except ImportError:
     pass
 
 from cephadm import CephadmOrchestrator
-from orchestrator import raise_if_exception, Completion, HostSpec
+from orchestrator import raise_if_exception, OrchResult, HostSpec
 from tests import mock
 
 
@@ -64,31 +63,8 @@ def with_cephadm_module(module_options=None, store=None):
 
 
 def wait(m, c):
-    # type: (CephadmOrchestrator, Completion) -> Any
-    m.process([c])
-
-    try:
-        # if in debugger
-        import pydevd  # noqa: F401
-        in_debug = True
-    except ImportError:
-        in_debug = False
-
-    if in_debug:
-        while True:    # don't timeout
-            if c.is_finished:
-                raise_if_exception(c)
-                return c.result
-            time.sleep(0.1)
-    else:
-        for i in range(30):
-            if i % 10 == 0:
-                m.process([c])
-            if c.is_finished:
-                raise_if_exception(c)
-                return c.result
-            time.sleep(0.1)
-    assert False, "timeout" + str(c._state)
+    # type: (CephadmOrchestrator, OrchResult) -> Any
+    return raise_if_exception(c)
 
 
 @contextmanager
index 9480ea591ee40b9e2861917ff6be1105ab69b895..327c12d2ad2720d1ee9622a563de85bf16fabf48 100644 (file)
@@ -1,25 +1,10 @@
 import pytest
 
-from .fixtures import wait
-from ..module import trivial_completion, forall_hosts
+from ..module import forall_hosts
 
 
 class TestCompletion(object):
 
-    def test_trivial(self, cephadm_module):
-        @trivial_completion
-        def run(x):
-            return x + 1
-        assert wait(cephadm_module, run(1)) == 2
-
-    def test_exception(self, cephadm_module):
-        @trivial_completion
-        def run(x):
-            raise ValueError
-        c = run(1)
-        with pytest.raises(ValueError):
-            wait(cephadm_module, c)
-
     @pytest.mark.parametrize("input,expected", [
         ([], []),
         ([1], ["(1,)"]),
index 3842a84a3c384a7e33a6f0cc0f4ed4155318d376..d9837e0c08192cb3989f0568d66ceb1f38fbb284 100644 (file)
@@ -4,7 +4,7 @@ from .module import OrchestratorCli
 
 # usage: E.g. `from orchestrator import StatelessServiceSpec`
 from ._interface import \
-    OrchResult, raise_if_exception, \
+    OrchResult, raise_if_exception, handle_orch_error, \
     CLICommand, _cli_write_command, _cli_read_command, CLICommandMeta, \
     Orchestrator, OrchestratorClientMixin, \
     OrchestratorValidationError, OrchestratorError, NoOrchestrator, \