From 68030bec52c98b19c398aff26677f6ff4a71fbe3 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Mon, 8 Feb 2021 01:37:07 +0100 Subject: [PATCH] mgr/cephadm: Adapt cephadm to new orch interface Signed-off-by: Sebastian Wagner --- src/pybind/mgr/cephadm/module.py | 153 +++++++----------- src/pybind/mgr/cephadm/tests/fixtures.py | 30 +--- .../mgr/cephadm/tests/test_completion.py | 17 +- src/pybind/mgr/orchestrator/__init__.py | 2 +- 4 files changed, 66 insertions(+), 136 deletions(-) diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index afd9314dead8c..615179a2b9347 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -34,7 +34,7 @@ from mgr_util import create_self_signed_cert import secrets import orchestrator from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \ - CLICommandMeta, DaemonDescription, DaemonDescriptionStatus + CLICommandMeta, DaemonDescription, DaemonDescriptionStatus, handle_orch_error from orchestrator._interface import GenericSpec from orchestrator._interface import daemon_type_to_service @@ -95,24 +95,6 @@ DEFAULT_ALERT_MANAGER_IMAGE = 'docker.io/prom/alertmanager:v0.20.0' # ------------------------------------------------------------------------------ -class CephadmCompletion(orchestrator.Completion[T]): - def evaluate(self) -> None: - self.finalize(None) - - -def trivial_completion(f: Callable[..., T]) -> Callable[..., CephadmCompletion[T]]: - """ - Decorator to make CephadmCompletion methods return - a completion object that executes themselves. - """ - - @wraps(f) - def wrapper(*args: Any, **kwargs: Any) -> CephadmCompletion: - return CephadmCompletion(on_complete=lambda _: f(*args, **kwargs)) - - return wrapper - - def service_inactive(spec_name: str) -> Callable: def inner(func: Callable) -> Callable: @wraps(func) @@ -385,8 +367,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self.health_checks: Dict[str, dict] = {} - self.all_progress_references = list() # type: List[orchestrator.ProgressReference] - self.inventory = Inventory(self) self.cache = HostCache(self) @@ -695,17 +675,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, return True, err, ret - def process(self, completions: List[CephadmCompletion]) -> None: # type: ignore - """ - Does nothing, as completions are processed in another thread. - """ - if completions: - self.log.debug("process: completions={0}".format( - orchestrator.pretty_print(completions))) - - for p in completions: - p.evaluate() - @orchestrator._cli_write_command( prefix='cephadm set-ssh-config') def _set_ssh_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]: @@ -1213,11 +1182,11 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self.log.info('Added host %s' % spec.hostname) return "Added host '{}'".format(spec.hostname) - @trivial_completion + @handle_orch_error def add_host(self, spec: HostSpec) -> str: return self._add_host(spec) - @trivial_completion + @handle_orch_error def remove_host(self, host): # type: (str) -> str """ @@ -1232,7 +1201,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self.log.info('Removed host %s' % host) return "Removed host '{}'".format(host) - @trivial_completion + @handle_orch_error def update_host_addr(self, host: str, addr: str) -> str: self.inventory.set_addr(host, addr) self._reset_con(host) @@ -1240,7 +1209,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self.log.info('Set host %s addr to %s' % (host, addr)) return "Updated host '{}' addr to '{}'".format(host, addr) - @trivial_completion + @handle_orch_error def get_hosts(self): # type: () -> List[orchestrator.HostSpec] """ @@ -1251,13 +1220,13 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, """ return list(self.inventory.all_specs()) - @trivial_completion + @handle_orch_error def add_host_label(self, host: str, label: str) -> str: self.inventory.add_label(host, label) self.log.info('Added label %s to host %s' % (label, host)) return 'Added label %s to host %s' % (label, host) - @trivial_completion + @handle_orch_error def remove_host_label(self, host: str, label: str) -> str: self.inventory.rm_label(host, label) self.log.info('Removed label %s to host %s' % (label, host)) @@ -1298,7 +1267,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, + 'Note the following:\n\n' + '\n'.join(notifications)) return 0, f'It is presumed safe to stop host {hostname}' - @trivial_completion + @handle_orch_error def host_ok_to_stop(self, hostname: str) -> str: if hostname not in self.cache.get_hosts(): raise OrchestratorError(f'Cannot find host "{hostname}"') @@ -1325,7 +1294,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, } self.set_health_checks(self.health_checks) - @trivial_completion + @handle_orch_error @host_exists() def enter_host_maintenance(self, hostname: str, force: bool = False) -> str: """ Attempt to place a cluster host in maintenance @@ -1395,7 +1364,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, return f"Ceph cluster {self._cluster_fsid} on {hostname} moved to maintenance" - @trivial_completion + @handle_orch_error @host_exists() def exit_host_maintenance(self, hostname: str) -> str: """Exit maintenance mode and return a host to an operational state @@ -1464,7 +1433,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self._kick_serve_loop() - @trivial_completion + @handle_orch_error def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> List[orchestrator.ServiceDescription]: if refresh: @@ -1564,7 +1533,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, sm[n].size = sm[n].size * 2 return list(sm.values()) - @trivial_completion + @handle_orch_error def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, @@ -1589,7 +1558,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, result.append(dd) return result - @trivial_completion + @handle_orch_error def service_action(self, action: str, service_name: str) -> List[str]: dds: List[DaemonDescription] = self.cache.get_daemons_by_service(service_name) if not dds: @@ -1661,7 +1630,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, 'who': utils.name_to_config_section(daemon_type + '.' + daemon_id), }) - @trivial_completion + @handle_orch_error def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> str: d = self.cache.get_daemon(daemon_name) assert d.daemon_type is not None @@ -1694,7 +1663,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self._kick_serve_loop() return msg - @trivial_completion + @handle_orch_error def remove_daemons(self, names): # type: (List[str]) -> List[str] args = [] @@ -1707,7 +1676,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self.log.info('Remove daemons %s' % [a[0] for a in args]) return self._remove_daemons(args) - @trivial_completion + @handle_orch_error def remove_service(self, service_name: str) -> str: self.log.info('Remove service %s' % service_name) self._trigger_preview_refresh(service_name=service_name) @@ -1723,7 +1692,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, # must be idempotent: still a success. return f'Failed to remove service. <{service_name}> was not found.' - @trivial_completion + @handle_orch_error def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]: """ Return the storage inventory of hosts matching the given filter. @@ -1752,7 +1721,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, inventory.Devices(dls))) return result - @trivial_completion + @handle_orch_error def zap_device(self, host: str, path: str) -> str: self.log.info('Zap device %s:%s' % (host, path)) out, err, code = CephadmServe(self)._run_cephadm( @@ -1764,7 +1733,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err)) return '\n'.join(out + err) - @trivial_completion + @handle_orch_error def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]: """ Blink a device light. Calling something like:: @@ -1843,7 +1812,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self.log.info(f"Marking host: {host} for OSDSpec preview refresh.") self.cache.osdspec_previews_refresh_queue.append(host) - @trivial_completion + @handle_orch_error def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> List[str]: """ Deprecated. Please use `apply()` instead. @@ -1852,7 +1821,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, """ return [self._apply(spec) for spec in specs] - @trivial_completion + @handle_orch_error def create_osds(self, drive_group: DriveGroupSpec) -> str: return self.osd_service.create_from_spec(drive_group) @@ -1974,16 +1943,16 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, return create_func_map(args) - @trivial_completion + @handle_orch_error def apply_mon(self, spec: ServiceSpec) -> str: return self._apply(spec) - @trivial_completion + @handle_orch_error def add_mon(self, spec): # type: (ServiceSpec) -> List[str] return self._add_daemon('mon', spec, self.mon_service.prepare_create) - @trivial_completion + @handle_orch_error def add_mgr(self, spec): # type: (ServiceSpec) -> List[str] return self._add_daemon('mgr', spec, self.mgr_service.prepare_create) @@ -2023,7 +1992,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, 'remove': [d.hostname for d in remove_daemon_hosts] } - @trivial_completion + @handle_orch_error def plan(self, specs: Sequence[GenericSpec]) -> List: results = [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n' 'to the current inventory setup. If any on these conditions changes, the \n' @@ -2074,59 +2043,59 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self._kick_serve_loop() return "Scheduled %s update..." % spec.service_name() - @trivial_completion + @handle_orch_error def apply(self, specs: Sequence[GenericSpec]) -> List[str]: results = [] for spec in specs: results.append(self._apply(spec)) return results - @trivial_completion + @handle_orch_error def apply_mgr(self, spec: ServiceSpec) -> str: return self._apply(spec) - @trivial_completion + @handle_orch_error def add_mds(self, spec: ServiceSpec) -> List[str]: return self._add_daemon('mds', spec, self.mds_service.prepare_create) - @trivial_completion + @handle_orch_error def apply_mds(self, spec: ServiceSpec) -> str: return self._apply(spec) - @trivial_completion + @handle_orch_error def add_rgw(self, spec: ServiceSpec) -> List[str]: return self._add_daemon('rgw', spec, self.rgw_service.prepare_create) - @trivial_completion + @handle_orch_error def apply_rgw(self, spec: ServiceSpec) -> str: return self._apply(spec) - @trivial_completion + @handle_orch_error def apply_ha_rgw(self, spec: ServiceSpec) -> str: return self._apply(spec) - @trivial_completion + @handle_orch_error def add_iscsi(self, spec): # type: (ServiceSpec) -> List[str] return self._add_daemon('iscsi', spec, self.iscsi_service.prepare_create) - @trivial_completion + @handle_orch_error def apply_iscsi(self, spec: ServiceSpec) -> str: return self._apply(spec) - @trivial_completion + @handle_orch_error def add_rbd_mirror(self, spec: ServiceSpec) -> List[str]: return self._add_daemon('rbd-mirror', spec, self.rbd_mirror_service.prepare_create) - @trivial_completion + @handle_orch_error def apply_rbd_mirror(self, spec: ServiceSpec) -> str: return self._apply(spec) - @trivial_completion + @handle_orch_error def add_nfs(self, spec: ServiceSpec) -> List[str]: return self._add_daemon('nfs', spec, self.nfs_service.prepare_create) - @trivial_completion + @handle_orch_error def apply_nfs(self, spec: ServiceSpec) -> str: return self._apply(spec) @@ -2134,73 +2103,73 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, # type: () -> str return self.get('mgr_map').get('services', {}).get('dashboard', '') - @trivial_completion + @handle_orch_error def add_prometheus(self, spec: ServiceSpec) -> List[str]: return self._add_daemon('prometheus', spec, self.prometheus_service.prepare_create) - @trivial_completion + @handle_orch_error def apply_prometheus(self, spec: ServiceSpec) -> str: return self._apply(spec) - @trivial_completion + @handle_orch_error def add_node_exporter(self, spec): # type: (ServiceSpec) -> List[str] return self._add_daemon('node-exporter', spec, self.node_exporter_service.prepare_create) - @trivial_completion + @handle_orch_error def apply_node_exporter(self, spec: ServiceSpec) -> str: return self._apply(spec) - @trivial_completion + @handle_orch_error def add_crash(self, spec): # type: (ServiceSpec) -> List[str] return self._add_daemon('crash', spec, self.crash_service.prepare_create) - @trivial_completion + @handle_orch_error def apply_crash(self, spec: ServiceSpec) -> str: return self._apply(spec) - @trivial_completion + @handle_orch_error def add_grafana(self, spec): # type: (ServiceSpec) -> List[str] return self._add_daemon('grafana', spec, self.grafana_service.prepare_create) - @trivial_completion + @handle_orch_error def apply_grafana(self, spec: ServiceSpec) -> str: return self._apply(spec) - @trivial_completion + @handle_orch_error def add_alertmanager(self, spec): # type: (ServiceSpec) -> List[str] return self._add_daemon('alertmanager', spec, self.alertmanager_service.prepare_create) - @trivial_completion + @handle_orch_error def apply_alertmanager(self, spec: ServiceSpec) -> str: return self._apply(spec) - @trivial_completion + @handle_orch_error def add_container(self, spec: ServiceSpec) -> List[str]: return self._add_daemon('container', spec, self.container_service.prepare_create) - @trivial_completion + @handle_orch_error def apply_container(self, spec: ServiceSpec) -> str: return self._apply(spec) - @trivial_completion + @handle_orch_error def add_cephadm_exporter(self, spec): # type: (ServiceSpec) -> List[str] return self._add_daemon('cephadm-exporter', spec, self.cephadm_exporter_service.prepare_create) - @trivial_completion + @handle_orch_error def apply_cephadm_exporter(self, spec: ServiceSpec) -> str: return self._apply(spec) - @trivial_completion + @handle_orch_error def upgrade_check(self, image: str, version: str) -> str: if self.inventory.get_host_with_state("maintenance"): raise OrchestratorError("check aborted - you have hosts in maintenance state") @@ -2237,29 +2206,29 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, return json.dumps(r, indent=4, sort_keys=True) - @trivial_completion + @handle_orch_error def upgrade_status(self) -> orchestrator.UpgradeStatusSpec: return self.upgrade.upgrade_status() - @trivial_completion + @handle_orch_error def upgrade_start(self, image: str, version: str) -> str: if self.inventory.get_host_with_state("maintenance"): raise OrchestratorError("upgrade aborted - you have host(s) in maintenance state") return self.upgrade.upgrade_start(image, version) - @trivial_completion + @handle_orch_error def upgrade_pause(self) -> str: return self.upgrade.upgrade_pause() - @trivial_completion + @handle_orch_error def upgrade_resume(self) -> str: return self.upgrade.upgrade_resume() - @trivial_completion + @handle_orch_error def upgrade_stop(self) -> str: return self.upgrade.upgrade_stop() - @trivial_completion + @handle_orch_error def remove_osds(self, osd_ids: List[str], replace: bool = False, force: bool = False) -> str: @@ -2293,7 +2262,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self._kick_serve_loop() return "Scheduled OSD(s) for removal" - @trivial_completion + @handle_orch_error def stop_remove_osds(self, osd_ids: List[str]) -> str: """ Stops a `removal` process for a List of OSDs. @@ -2310,7 +2279,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self._kick_serve_loop() return "Stopped OSD(s) removal" - @trivial_completion + @handle_orch_error def remove_osds_status(self) -> List[OSD]: """ The CLI call to retrieve an osd removal report diff --git a/src/pybind/mgr/cephadm/tests/fixtures.py b/src/pybind/mgr/cephadm/tests/fixtures.py index 8ded190f31126..f3d27b352534b 100644 --- a/src/pybind/mgr/cephadm/tests/fixtures.py +++ b/src/pybind/mgr/cephadm/tests/fixtures.py @@ -1,4 +1,3 @@ -import time import fnmatch from contextlib import contextmanager @@ -12,7 +11,7 @@ except ImportError: pass from cephadm import CephadmOrchestrator -from orchestrator import raise_if_exception, Completion, HostSpec +from orchestrator import raise_if_exception, OrchResult, HostSpec from tests import mock @@ -64,31 +63,8 @@ def with_cephadm_module(module_options=None, store=None): def wait(m, c): - # type: (CephadmOrchestrator, Completion) -> Any - m.process([c]) - - try: - # if in debugger - import pydevd # noqa: F401 - in_debug = True - except ImportError: - in_debug = False - - if in_debug: - while True: # don't timeout - if c.is_finished: - raise_if_exception(c) - return c.result - time.sleep(0.1) - else: - for i in range(30): - if i % 10 == 0: - m.process([c]) - if c.is_finished: - raise_if_exception(c) - return c.result - time.sleep(0.1) - assert False, "timeout" + str(c._state) + # type: (CephadmOrchestrator, OrchResult) -> Any + return raise_if_exception(c) @contextmanager diff --git a/src/pybind/mgr/cephadm/tests/test_completion.py b/src/pybind/mgr/cephadm/tests/test_completion.py index 9480ea591ee40..327c12d2ad272 100644 --- a/src/pybind/mgr/cephadm/tests/test_completion.py +++ b/src/pybind/mgr/cephadm/tests/test_completion.py @@ -1,25 +1,10 @@ import pytest -from .fixtures import wait -from ..module import trivial_completion, forall_hosts +from ..module import forall_hosts class TestCompletion(object): - def test_trivial(self, cephadm_module): - @trivial_completion - def run(x): - return x + 1 - assert wait(cephadm_module, run(1)) == 2 - - def test_exception(self, cephadm_module): - @trivial_completion - def run(x): - raise ValueError - c = run(1) - with pytest.raises(ValueError): - wait(cephadm_module, c) - @pytest.mark.parametrize("input,expected", [ ([], []), ([1], ["(1,)"]), diff --git a/src/pybind/mgr/orchestrator/__init__.py b/src/pybind/mgr/orchestrator/__init__.py index 3842a84a3c384..d9837e0c08192 100644 --- a/src/pybind/mgr/orchestrator/__init__.py +++ b/src/pybind/mgr/orchestrator/__init__.py @@ -4,7 +4,7 @@ from .module import OrchestratorCli # usage: E.g. `from orchestrator import StatelessServiceSpec` from ._interface import \ - OrchResult, raise_if_exception, \ + OrchResult, raise_if_exception, handle_orch_error, \ CLICommand, _cli_write_command, _cli_read_command, CLICommandMeta, \ Orchestrator, OrchestratorClientMixin, \ OrchestratorValidationError, OrchestratorError, NoOrchestrator, \ -- 2.39.5