]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/rook: Adapt to new completions
authorSebastian Wagner <sebastian.wagner@suse.com>
Fri, 30 Aug 2019 13:10:10 +0000 (15:10 +0200)
committerSebastian Wagner <sebastian.wagner@suse.com>
Wed, 27 Nov 2019 12:35:24 +0000 (13:35 +0100)
Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
src/pybind/mgr/rook/module.py

index 41c4c442d01ec1b39595a914831a5e14d24012b6..473e21fb534756c2641efec60060e56bcf5e968c 100644 (file)
@@ -1,16 +1,18 @@
 import threading
 import functools
 import os
-import uuid
 
 from ceph.deployment import inventory
 
 try:
-    from typing import List, Dict, Optional, Callable
+    from typing import List, Dict, Optional, Callable, TypeVar
     from ceph.deployment.drive_group import DriveGroupSpec
 except ImportError:
     pass  # just for type checking
 
+T = TypeVar('T')
+
+
 try:
     from kubernetes import client, config
     from kubernetes.client.rest import ApiException
@@ -34,38 +36,39 @@ import orchestrator
 from .rook_cluster import RookCluster
 
 
-class RookWriteCompletion(orchestrator.Completion):
-    """
-    Writes are a two-phase thing, firstly sending
-    the write to the k8s API (fast) and then waiting
-    for the corresponding change to appear in the
-    Ceph cluster (slow)
-    """
-    def __init__(self, message):
-        self.progress_reference = orchestrator.ProgressReference(
-            message=message
-        )
-        super(RookWriteCompletion, self).__init__()
+class RookCompletion(orchestrator.Completion[T]):
+    def evaluate(self):
+        self._first_promise.finalize(None)
 
 
 def deferred_read(f):
-    # type: (Callable) -> Callable[..., orchestrator.Completion]
+    # type: (Callable[..., T]) -> Callable[..., RookCompletion[T]]
     """
     Decorator to make RookOrchestrator methods return
     a completion object that executes themselves.
     """
 
     @functools.wraps(f)
-    def wrapper(self, *args, **kwargs):
-        c = orchestrator.Completion()
-        c.then(
-            lambda: f(*args, **kwargs)
-        )
-        return c
+    def wrapper(*args, **kwargs):
+        return RookCompletion(on_complete=lambda _: f(*args, **kwargs))
 
     return wrapper
 
 
+def write_completion(on_complete,  # type: Callable[[], T]
+                     message,  # type: str
+                     mgr,
+                     calc_percent=None  # type: Optional[Callable[[], RookCompletion[float]]]
+                     ):
+    # type: (...) -> RookCompletion[T]
+    return RookCompletion.with_progress(
+        message=message,
+        mgr=mgr,
+        on_complete=lambda _: on_complete(),
+        calc_percent=calc_percent,
+    )
+
+
 class RookEnv(object):
     def __init__(self):
         # POD_NAMESPACE already exist for Rook 0.9
@@ -86,39 +89,27 @@ class RookEnv(object):
 
 
 class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
+    """
+    Writes are a two-phase thing, firstly sending
+    the write to the k8s API (fast) and then waiting
+    for the corresponding change to appear in the
+    Ceph cluster (slow)
+
+    Right now, wre calling the k8s API synchronously.
+    """
+
     MODULE_OPTIONS = [
         # TODO: configure k8s API addr instead of assuming local
     ]
 
     def process(self, completions):
-        pass
-
-    def process_promises(self, promises):
-        # type: (List[RookPromise]) -> None
-
-
-        if promises:
-            self.log.info("wait: promises={0}".format(promises))
-
-        # Synchronously call the K8s API
-        for p in promises:
-            if not isinstance(p, RookPromise):
-                raise TypeError(
-                    "wait() requires list of completions, not {0}".format(
-                        p.__class__
-                    ))
-
-            if not p.needs_result:
-                continue
-
-            new_promise = p.execute()
-            if c.exception and not isinstance(c.exception, orchestrator.OrchestratorError):
-                self.log.exception("Completion {0} threw an exception:".format(
-                    c.message
-                ))
-                c._complete = True
+        # type: (List[RookCompletion]) -> None
 
+        if completions:
+            self.log.info("wait: promises={0}".format(completions))
 
+            for p in completions:
+                p.evaluate()
 
     @staticmethod
     def can_run():
@@ -151,7 +142,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         self._shutdown = threading.Event()
 
-        self.all_promises = list()  # type: List[RookPromise]
+        self.all_progress_references = list()  # type: List[orchestrator.ProgressReference]
 
     def shutdown(self):
         self._shutdown.set()
@@ -208,8 +199,9 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
             # in case we had a caller that wait()'ed on them long enough
             # to get persistence but not long enough to get completion
 
-            self.process(self.all_promises)
-            self.all_promises = [p for p in self.all_promises if not p.completion.is_finished]
+            self.all_progress_references = [p for p in self.all_progress_references if not p.effective]
+            for p in self.all_progress_references:
+                p.update()
 
             self._shutdown.wait(5)
 
@@ -247,6 +239,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
 
     @deferred_read
     def get_hosts(self):
+        # type: () -> List[orchestrator.InventoryNode]
         return [orchestrator.InventoryNode(n, inventory.Devices([])) for n in self.rook_cluster.get_node_names()]
 
     @deferred_read
@@ -299,43 +292,55 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         return result
 
     def _service_add_decorate(self, typename, spec, func):
-        return RookWriteCompletion(lambda: func(spec), None,
-                    "Creating {0} services for {1}".format(typename, spec.name))
+        return write_completion(
+            on_complete=lambda : func(spec),
+            message="Creating {} services for {}".format(typename, spec.name),
+            mgr=self
+        )
 
     def add_mds(self, spec):
-        return self._service_add_decorate("Filesystem", spec,
-                                          self.rook_cluster.add_filesystem)
+        return self._service_add_decorate('MDS', spec,
+                                       self.rook_cluster.add_filesystem)
 
     def add_rgw(self, spec):
-        return self._service_add_decorate("RGW", spec,
-                                          self.rook_cluster.add_objectstore)
+        return self._service_add_decorate('RGW', spec,
+                                       self.rook_cluster.add_objectstore)
 
     def add_nfs(self, spec):
         return self._service_add_decorate("NFS", spec,
                                           self.rook_cluster.add_nfsgw)
 
+    def _service_rm_decorate(self, typename, name, func):
+        return write_completion(
+            on_complete=lambda : func(name),
+            message="Removing {} services for {}".format(typename, name),
+            mgr=self
+        )
+
     def remove_mds(self, name):
-        return RookWriteCompletion(
-            lambda: self.rook_cluster.rm_service('cephfilesystems', name), None,
-            "Removing {0} services for {1}".format('mds', name))
+        return self._service_rm_decorate(
+            'MDS', name, lambda: self.rook_cluster.rm_service('cephfilesystems', name)
+        )
 
     def remove_rgw(self, zone):
-        return RookWriteCompletion(
-            lambda: self.rook_cluster.rm_service('cephobjectstores', zone), None,
-            "Removing {0} services for {1}".format('rgw', zone))
+        return self._service_rm_decorate(
+            'RGW', zone, lambda: self.rook_cluster.rm_service('cephobjectstores', zone)
+        )
 
     def remove_nfs(self, name):
-        return RookWriteCompletion(
-            lambda: self.rook_cluster.rm_service('cephnfses', name), None,
-            "Removing {0} services for {1}".format('nfs', name))
+        return self._service_rm_decorate(
+            'NFS', name, lambda: self.rook_cluster.rm_service('cephnfses', name)
+        )
 
     def update_mons(self, num, hosts):
         if hosts:
             raise RuntimeError("Host list is not supported by rook.")
 
-        return RookWriteCompletion(
-            lambda: self.rook_cluster.update_mon_count(num), None,
-            "Updating mon count to {0}".format(num))
+        return write_completion(
+            lambda: self.rook_cluster.update_mon_count(num),
+            "Updating mon count to {0}".format(num),
+            mgr=self
+        )
 
     def update_mds(self, spec):
         num = spec.count
@@ -345,27 +350,23 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
 
     def update_nfs(self, spec):
         num = spec.count
-        return RookWriteCompletion(
-            lambda: self.rook_cluster.update_nfs_count(spec.name, num), None,
-                "Updating NFS server count in {0} to {1}".format(spec.name, num))
+        return write_completion(
+            lambda: self.rook_cluster.update_nfs_count(spec.name, num),
+            "Updating NFS server count in {0} to {1}".format(spec.name, num),
+            mgr=self
+        )
 
-    def create_osds(self, drive_group, _):
-        # type: (DriveGroupSpec, List[str]) -> RookWriteCompletion
+    def create_osds(self, drive_group):
+        # type: (DriveGroupSpec) -> RookCompletion
 
-        targets = []
+        targets = []  # type: List[str]
         if drive_group.data_devices:
             targets += drive_group.data_devices.paths
         if drive_group.data_directories:
             targets += drive_group.data_directories
 
-        p = orchestrator.ProgressReference(
-            "Creating OSD on {0}:{1}".format(drive_group.hosts(drive_group.host_pattern),
-                                             targets))
-
-        def execute(all_hosts):
-            p.effective_when(
-                lambda hosts: has_osds
-            )
+        def execute(all_hosts_):
+            all_hosts = orchestrator.InventoryNode.get_host_names(all_hosts_)
 
             assert len(drive_group.hosts(all_hosts)) == 1
 
@@ -378,7 +379,15 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
             if not self.rook_cluster.can_create_osd():
                 raise RuntimeError("Rook cluster configuration does not "
                                    "support OSD creation.")
-            return self.rook_cluster.add_osds(drive_group, all_hosts)
+
+            return orchestrator.Completion.with_progress(
+                message="Creating OSD on {0}:{1}".format(
+                        drive_group.hosts(drive_group.host_pattern),
+                        targets),
+                mgr=self,
+                on_complete=lambda _:self.rook_cluster.add_osds(drive_group, all_hosts),
+                calc_percent=lambda: has_osds(all_hosts)
+            )
 
         @deferred_read
         def has_osds(all_hosts):
@@ -411,8 +420,5 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
 
             return found is not None
 
-
         c = self.get_hosts().then(execute)
-        c.progress_reference = p
         return c
-