]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/test_orchestrator: adapt to now orch interface
authorSebastian Wagner <sebastian.wagner@suse.com>
Mon, 8 Feb 2021 00:56:16 +0000 (01:56 +0100)
committerSebastian Wagner <sebastian.wagner@suse.com>
Mon, 1 Mar 2021 15:48:54 +0000 (16:48 +0100)
Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
src/pybind/mgr/test_orchestrator/module.py

index 8a62ba5c1fb98a8ef61eb8cece467714cd0f222e..1f527cba076572425b35d0469fa4d80dd7500c21 100644 (file)
@@ -20,41 +20,7 @@ from mgr_module import CLICommand, HandleCommandResult
 from mgr_module import MgrModule
 
 import orchestrator
-
-
-class TestCompletion(orchestrator.Completion):
-    def evaluate(self):
-        self.finalize(None)
-
-
-def deferred_read(f):
-    # type: (Callable) -> Callable[..., TestCompletion]
-    """
-    Decorator to make methods return
-    a completion object that executes themselves.
-    """
-
-    @functools.wraps(f)
-    def wrapper(*args, **kwargs):
-        return TestCompletion(on_complete=lambda _: f(*args, **kwargs))
-
-    return wrapper
-
-
-def deferred_write(message):
-    def inner(f):
-        # type: (Callable) -> Callable[..., TestCompletion]
-
-        @functools.wraps(f)
-        def wrapper(self, *args, **kwargs):
-            return TestCompletion.with_progress(
-                message=message,
-                mgr=self,
-                on_complete=lambda _: f(self, *args, **kwargs),
-            )
-
-        return wrapper
-    return inner
+from orchestrator import handle_orch_error, raise_if_exception
 
 
 class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
@@ -67,13 +33,6 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
     The implementation is similar to the Rook orchestrator, but simpler.
     """
 
-    def process(self, completions: List[TestCompletion]) -> None:  # type: ignore
-        if completions:
-            self.log.info("process: completions={0}".format(orchestrator.pretty_print(completions)))
-
-            for p in completions:
-                p.evaluate()
-
     @CLICommand('test_orchestrator load_data', perm='w')
     def _load_data(self, inbuf):
         """
@@ -98,7 +57,6 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
         self._initialized = threading.Event()
         self._shutdown = threading.Event()
         self._init_data({})
-        self.all_progress_references = list()  # type: List[orchestrator.ProgressReference]
 
     def shutdown(self):
         self._shutdown.set()
@@ -108,14 +66,6 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
         self._initialized.set()
 
         while not self._shutdown.is_set():
-            # XXX hack (or is it?) to kick all completions periodically,
-            # in case we had a caller that wait()'ed on them long enough
-            # to get persistence but not long enough to get completion
-
-            self.all_progress_references = [p for p in self.all_progress_references if not p.effective]
-            for p in self.all_progress_references:
-                p.update()
-
             self._shutdown.wait(5)
 
     def _init_data(self, data=None):
@@ -126,7 +76,7 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
         self._daemons = [orchestrator.DaemonDescription.from_json(daemon)
                           for daemon in data.get('daemons', [])]
 
-    @deferred_read
+    @handle_orch_error
     def get_inventory(self, host_filter=None, refresh=False):
         """
         There is no guarantee which devices are returned by get_inventory.
@@ -190,7 +140,7 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
             daemons.append(daemon)
         return daemons
 
-    @deferred_read
+    @handle_orch_error
     def describe_service(self, service_type=None, service_name=None, refresh=False):
         if self._services:
             # Dummy data
@@ -218,7 +168,7 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         return list(filter(_filter_func, services))
 
-    @deferred_read
+    @handle_orch_error
     def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None, refresh=False):
         """
         There is no guarantee which daemons are returned by describe_service, except that
@@ -246,8 +196,9 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
     def preview_drivegroups(self, drive_group_name=None, dg_specs=None):
         return [{}]
 
+    @handle_orch_error
     def create_osds(self, drive_group):
-        # type: (DriveGroupSpec) -> TestCompletion
+        # type: (DriveGroupSpec) -> str
         """ Creates OSDs from a drive group specification.
 
         $: ceph orch osd create -i <dg.file>
@@ -255,105 +206,98 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
         The drivegroup file must only contain one spec at a time.
         """
 
-        def run(all_hosts):
-            # type: (List[orchestrator.HostSpec]) -> None
-            drive_group.validate()
+        drive_group.validate()
+        all_hosts = raise_if_exception(self.get_hosts())
 
-            def get_hosts_func(label=None, as_hostspec=False):
-                if as_hostspec:
-                    return all_hosts
-                return [h.hostname for h in all_hosts]
 
-            if not drive_group.placement.filter_matching_hosts(get_hosts_func):
-                raise orchestrator.OrchestratorValidationError('failed to match')
+        def get_hosts_func(label=None, as_hostspec=False):
+            if as_hostspec:
+                return all_hosts
+            return [h.hostname for h in all_hosts]
 
-        return self.get_hosts().then(run).then(
-            on_complete=orchestrator.ProgressReference(
-                message='create_osds',
-                mgr=self,
-            )
-        )
+        if not drive_group.placement.filter_matching_hosts(get_hosts_func):
+            raise orchestrator.OrchestratorValidationError('failed to match')
+        return ''
 
+    @handle_orch_error
     def apply_drivegroups(self, specs):
-        # type: (List[DriveGroupSpec]) -> TestCompletion
+        # type: (List[DriveGroupSpec]) -> List[str]
         drive_group = specs[0]
 
-        def run(all_hosts):
-            # type: (List[orchestrator.HostSpec]) -> None
-            drive_group.validate()
-
-            def get_hosts_func(label=None, as_hostspec=False):
-                if as_hostspec:
-                    return all_hosts
-                return [h.hostname for h in all_hosts]
-
-            if not drive_group.placement.filter_matching_hosts(get_hosts_func):
-                raise orchestrator.OrchestratorValidationError('failed to match')
-        return self.get_hosts().then(run).then(
-            on_complete=orchestrator.ProgressReference(
-                message='apply_drivesgroups',
-                mgr=self,
-            )
-        )
-
-    @deferred_write("remove_daemons")
+        all_hosts = raise_if_exception(self.get_hosts())
+
+        drive_group.validate()
+
+        def get_hosts_func(label=None, as_hostspec=False):
+            if as_hostspec:
+                return all_hosts
+            return [h.hostname for h in all_hosts]
+
+        if not drive_group.placement.filter_matching_hosts(get_hosts_func):
+            raise orchestrator.OrchestratorValidationError('failed to match')
+        return []
+
+    @handle_orch_error
     def remove_daemons(self, names):
         assert isinstance(names, list)
+        return 'done'
 
-    @deferred_write("remove_service")
+    @handle_orch_error
     def remove_service(self, service_name):
         assert isinstance(service_name, str)
+        return 'done'
 
-    @deferred_write("blink_device_light")
+    @handle_orch_error
     def blink_device_light(self, ident_fault, on, locations):
         assert ident_fault in ("ident", "fault")
         assert len(locations)
         return ''
 
-    @deferred_write("service_action")
+    @handle_orch_error
     def service_action(self, action, service_name):
-        pass
+        return 'done'
 
-    @deferred_write("daemon_action")
+    @handle_orch_error
     def daemon_action(self, action, daemon_name, image=None):
-        pass
+        return 'done'
 
-    @deferred_write("Adding NFS service")
+    @handle_orch_error
     def add_nfs(self, spec):
-        # type: (NFSServiceSpec) -> None
+        # type: (NFSServiceSpec) -> List[str]
         assert isinstance(spec.pool, str)
+        return [spec.one_line_str()]
 
-    @deferred_write("apply_nfs")
+    @handle_orch_error
     def apply_nfs(self, spec):
-        pass
+        return spec.one_line_str()
 
-    @deferred_write("add_iscsi")
+    @handle_orch_error
     def add_iscsi(self, spec):
-        # type: (IscsiServiceSpec) -> None
-        pass
+        # type: (IscsiServiceSpec) -> List[str]
+        return [spec.one_line_str()]
 
-    @deferred_write("apply_iscsi")
+    @handle_orch_error
     def apply_iscsi(self, spec):
-        # type: (IscsiServiceSpec) -> None
-        pass
+        # type: (IscsiServiceSpec) -> str
+        return spec.one_line_str()
 
-    @deferred_write("add_mds")
+    @handle_orch_error
     def add_mds(self, spec):
-        pass
+        return 'done'
 
-    @deferred_write("add_rgw")
+    @handle_orch_error
     def add_rgw(self, spec):
-        pass
+        return 'done'
 
-    @deferred_read
+    @handle_orch_error
     def get_hosts(self):
         if self._inventory:
             return [orchestrator.HostSpec(i.name, i.addr, i.labels) for i in self._inventory]
         return [orchestrator.HostSpec('localhost')]
 
-    @deferred_write("add_host")
+    @handle_orch_error
     def add_host(self, spec):
-        # type: (orchestrator.HostSpec) -> None
+        # type: (orchestrator.HostSpec) -> str
         host = spec.hostname
         if host == 'raise_validation_error':
             raise orchestrator.OrchestratorValidationError("MON count must be either 1, 3 or 5")
@@ -368,22 +312,27 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
         if host == 'raise_import_error':
             raise ImportError("test_orchestrator not enabled")
         assert isinstance(host, str)
+        return ''
 
-    @deferred_write("remove_host")
+    @handle_orch_error
     def remove_host(self, host):
         assert isinstance(host, str)
+        return 'done'
 
-    @deferred_write("apply_mgr")
+    @handle_orch_error
     def apply_mgr(self, spec):
-        # type: (ServiceSpec) -> None
+        # type: (ServiceSpec) -> str
 
         assert not spec.placement.hosts or len(spec.placement.hosts) == spec.placement.count
         assert all([isinstance(h, str) for h in spec.placement.hosts])
+        return spec.one_line_str()
 
-    @deferred_write("apply_mon")
+    @handle_orch_error
     def apply_mon(self, spec):
-        # type: (ServiceSpec) -> None
+        # type: (ServiceSpec) -> str
 
         assert not spec.placement.hosts or len(spec.placement.hosts) == spec.placement.count
         assert all([isinstance(h[0], str) for h in spec.placement.hosts])
         assert all([isinstance(h[1], str) or h[1] is None for h in spec.placement.hosts])
+        return spec.one_line_str()
+