]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/rook: Adapt to new orch interface
authorSebastian Wagner <sebastian.wagner@suse.com>
Mon, 8 Feb 2021 00:47:42 +0000 (01:47 +0100)
committerSebastian Wagner <sebastian.wagner@suse.com>
Mon, 1 Mar 2021 15:48:54 +0000 (16:48 +0100)
Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
src/pybind/mgr/rook/module.py

index 1db6818a33907acef162e778de90abef8504dbcb..2bd1ddbbc2c25497de686ad6b323aee9c52b39cc 100644 (file)
@@ -33,6 +33,7 @@ except ImportError:
 
 from mgr_module import MgrModule, Option
 import orchestrator
+from orchestrator import handle_orch_error, OrchResult, raise_if_exception
 
 from .rook_cluster import RookCluster
 
@@ -41,42 +42,6 @@ FuncT = TypeVar('FuncT', bound=Callable)
 ServiceSpecT = TypeVar('ServiceSpecT', bound=ServiceSpec)
 
 
-
-class RookCompletion(orchestrator.Completion[T]):
-    def evaluate(self) -> None:
-        self.finalize(None)
-
-
-def deferred_read(f):
-    # type: (Callable[..., T]) -> Callable[..., RookCompletion[T]]
-
-    # See https://stackoverflow.com/questions/65936408/typing-function-when-decorator-change-generic-return-type
-    """
-    Decorator to make RookOrchestrator methods return
-    a completion object that executes themselves.
-    """
-
-    @functools.wraps(f)
-    def wrapper(*args: Any, **kwargs: Any) -> RookCompletion[T]:
-        return RookCompletion(on_complete=lambda _: f(*args, **kwargs))
-
-    return wrapper
-
-
-def write_completion(on_complete,  # type: Callable[[], T]
-                     message,  # type: str
-                     mgr: 'RookOrchestrator',
-                     calc_percent=None  # type: Optional[Callable[[], RookCompletion]]
-                     ):
-    # type: (...) -> RookCompletion[T]
-    return RookCompletion.with_progress(
-        message=message,
-        mgr=mgr,
-        on_complete=lambda _: on_complete(),
-        calc_percent=calc_percent,
-    )
-
-
 class RookEnv(object):
     def __init__(self) -> None:
         # POD_NAMESPACE already exist for Rook 0.9
@@ -110,13 +75,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         # TODO: configure k8s API addr instead of assuming local
     ]
 
-    def process(self, completions: List[RookCompletion]) -> None:  # type: ignore
-        if completions:
-            self.log.info("process: completions={0}".format(orchestrator.pretty_print(completions)))
-
-            for p in completions:
-                p.evaluate()
-
     @staticmethod
     def can_run() -> Tuple[bool, str]:
         if not kubernetes_imported:
@@ -149,8 +107,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         self._shutdown = threading.Event()
 
-        self.all_progress_references = list()  # type: List[orchestrator.ProgressReference]
-
     def shutdown(self) -> None:
         self._shutdown.set()
 
@@ -204,22 +160,9 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         self._initialized.set()
 
         while not self._shutdown.is_set():
-            # XXX hack (or is it?) to kick all completions periodically,
-            # in case we had a caller that wait()'ed on them long enough
-            # to get persistence but not long enough to get completion
-
-            self.all_progress_references = [p for p in self.all_progress_references if not p.effective]
-            for p in self.all_progress_references:
-                p.update()
-
             self._shutdown.wait(5)
 
-    def cancel_completions(self) -> None:
-        for p in self.all_progress_references:
-            p.fail()
-        self.all_progress_references.clear()
-
-    @deferred_read
+    @handle_orch_error
     def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
         host_list = None
         if host_filter and host_filter.hosts:
@@ -244,7 +187,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
                         sys_api = dict(
                             rotational = '1' if d['rotational'] else '0',
                             size = d['size']
-                            ),
+                        ),
                         available = False,
                         rejected_reasons=['device data coming from ceph-volume not provided'],
                     ))
@@ -253,12 +196,12 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         return result
 
-    @deferred_read
+    @handle_orch_error
     def get_hosts(self):
         # type: () -> List[orchestrator.HostSpec]
         return [orchestrator.HostSpec(n) for n in self.rook_cluster.get_node_names()]
 
-    @deferred_read
+    @handle_orch_error
     def describe_service(self,
                          service_type: Optional[str] = None,
                          service_name: Optional[str] = None,
@@ -275,26 +218,26 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         spec = {}
         if service_type == 'mon' or service_type is None:
             spec['mon'] = orchestrator.ServiceDescription(
-                    spec=ServiceSpec(
-                        'mon',
-                        placement=PlacementSpec(
-                            count=cl['spec'].get('mon', {}).get('count', 1),
-                            ),
-                        ),
-                    size=cl['spec'].get('mon', {}).get('count', 1),
-                    container_image_name=image_name,
-                    last_refresh=now,
-                    )
+                spec=ServiceSpec(
+                    'mon',
+                    placement=PlacementSpec(
+                        count=cl['spec'].get('mon', {}).get('count', 1),
+                    ),
+                ),
+                size=cl['spec'].get('mon', {}).get('count', 1),
+                container_image_name=image_name,
+                last_refresh=now,
+            )
         if service_type == 'mgr' or service_type is None:
             spec['mgr'] = orchestrator.ServiceDescription(
-                    spec=ServiceSpec(
-                        'mgr',
-                        placement=PlacementSpec.from_string('count:1'),
-                        ),
-                    size=1,
-                    container_image_name=image_name,
-                    last_refresh=now,
-                    )
+                spec=ServiceSpec(
+                    'mgr',
+                    placement=PlacementSpec.from_string('count:1'),
+                ),
+                size=1,
+                container_image_name=image_name,
+                last_refresh=now,
+            )
         if not cl['spec'].get('crashCollector', {}).get('disable', False):
             spec['crash'] = orchestrator.ServiceDescription(
                 spec=ServiceSpec(
@@ -309,7 +252,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         if service_type == 'mds' or service_type is None:
             # CephFilesystems
             all_fs = self.rook_cluster.rook_api_get(
-                    "cephfilesystems/")
+                "cephfilesystems/")
             self.log.debug('CephFilesystems %s' % all_fs)
             for fs in all_fs.get('items', []):
                 svc = 'mds.' + fs['metadata']['name']
@@ -321,20 +264,20 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
                 if fs['spec'].get('metadataServer', {}).get('activeStandby', False):
                     total_mds = active * 2
                     spec[svc] = orchestrator.ServiceDescription(
-                            spec=ServiceSpec(
-                                service_type='mds',
-                                service_id=fs['metadata']['name'],
-                                placement=PlacementSpec(count=active),
-                                ),
-                            size=total_mds,
-                            container_image_name=image_name,
-                            last_refresh=now,
-                            )
+                        spec=ServiceSpec(
+                            service_type='mds',
+                            service_id=fs['metadata']['name'],
+                            placement=PlacementSpec(count=active),
+                        ),
+                        size=total_mds,
+                        container_image_name=image_name,
+                        last_refresh=now,
+                    )
 
         if service_type == 'rgw' or service_type is None:
             # CephObjectstores
             all_zones = self.rook_cluster.rook_api_get(
-                    "cephobjectstores/")
+                "cephobjectstores/")
             self.log.debug('CephObjectstores %s' % all_zones)
             for zone in all_zones.get('items', []):
                 rgw_realm = zone['metadata']['name']
@@ -350,23 +293,23 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
                     ssl = False
                     port = zone['spec']['gateway']['port'] or 80
                 spec[svc] = orchestrator.ServiceDescription(
-                        spec=RGWSpec(
-                            service_id=rgw_realm + '.' + rgw_zone,
-                            rgw_realm=rgw_realm,
-                            rgw_zone=rgw_zone,
-                            ssl=ssl,
-                            rgw_frontend_port=port,
-                            placement=PlacementSpec(count=active),
-                            ),
-                        size=active,
-                        container_image_name=image_name,
-                        last_refresh=now,
-                        )
+                    spec=RGWSpec(
+                        service_id=rgw_realm + '.' + rgw_zone,
+                        rgw_realm=rgw_realm,
+                        rgw_zone=rgw_zone,
+                        ssl=ssl,
+                        rgw_frontend_port=port,
+                        placement=PlacementSpec(count=active),
+                    ),
+                    size=active,
+                    container_image_name=image_name,
+                    last_refresh=now,
+                )
 
         if service_type == 'nfs' or service_type is None:
             # CephNFSes
             all_nfs = self.rook_cluster.rook_api_get(
-                    "cephnfses/")
+                "cephnfses/")
             self.log.warning('CephNFS %s' % all_nfs)
             for nfs in all_nfs.get('items', []):
                 nfs_name = nfs['metadata']['name']
@@ -375,15 +318,15 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
                     continue
                 active = nfs['spec'].get('server', {}).get('active')
                 spec[svc] = orchestrator.ServiceDescription(
-                        spec=NFSServiceSpec(
-                            service_id=nfs_name,
-                            pool=nfs['spec']['rados']['pool'],
-                            namespace=nfs['spec']['rados'].get('namespace', None),
-                            placement=PlacementSpec(count=active),
-                            ),
-                        size=active,
-                        last_refresh=now,
-                        )
+                    spec=NFSServiceSpec(
+                        service_id=nfs_name,
+                        pool=nfs['spec']['rados']['pool'],
+                        namespace=nfs['spec']['rados'].get('namespace', None),
+                        placement=PlacementSpec(count=active),
+                    ),
+                    size=active,
+                    last_refresh=now,
+                )
 
         for dd in self._list_daemons():
             if dd.service_name() not in spec:
@@ -401,7 +344,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         return [v for k, v in spec.items()]
 
-    @deferred_read
+    @handle_orch_error
     def list_daemons(self,
                      service_name: Optional[str] = None,
                      daemon_type: Optional[str] = None,
@@ -458,73 +401,48 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         return result
 
-    def _service_add_decorate(self, typename: str, spec: ServiceSpecT, func: Callable[[ServiceSpecT], T]) -> RookCompletion[T]:
-        return write_completion(
-            on_complete=lambda : func(spec),
-            message="Creating {} services for {}".format(typename, spec.service_id),
-            mgr=self
-        )
-
-    def _service_rm_decorate(self, typename: str, name: str, func: Callable[[], T]) -> RookCompletion[T]:
-        return write_completion(
-            on_complete=lambda : func(),
-            message="Removing {} services for {}".format(typename, name),
-            mgr=self
-        )
-
-    def remove_service(self, service_name: str) -> RookCompletion[str]:
+    @handle_orch_error
+    def remove_service(self, service_name: str) -> str:
         service_type, service_name = service_name.split('.', 1)
         if service_type == 'mds':
-            return self._service_rm_decorate(
-                'MDS', service_name, lambda: self.rook_cluster.rm_service(
-                    'cephfilesystems', service_name)
-            )
+            return self.rook_cluster.rm_service('cephfilesystems', service_name)
         elif service_type == 'rgw':
-            return self._service_rm_decorate(
-                'RGW', service_name, lambda: self.rook_cluster.rm_service('cephobjectstores', service_name)
-            )
+            return self.rook_cluster.rm_service('cephobjectstores', service_name)
         elif service_type == 'nfs':
-            return self._service_rm_decorate(
-                'NFS', service_name, lambda: self.rook_cluster.rm_service('cephnfses', service_name)
-            )
+            return self.rook_cluster.rm_service('cephnfses', service_name)
         else:
             raise orchestrator.OrchestratorError(f'Service type {service_type} not supported')
 
+    @handle_orch_error
     def apply_mon(self, spec):
-        # type: (ServiceSpec) -> RookCompletion[str]
+        # type: (ServiceSpec) -> str
         if spec.placement.hosts or spec.placement.label:
             raise RuntimeError("Host list or label is not supported by rook.")
 
-        return write_completion(
-            lambda: self.rook_cluster.update_mon_count(spec.placement.count),
-            "Updating mon count to {0}".format(spec.placement.count),
-            mgr=self
-        )
+        return self.rook_cluster.update_mon_count(spec.placement.count)
 
+    @handle_orch_error
     def apply_mds(self, spec):
-        # type: (ServiceSpec) -> RookCompletion[str]
-        return self._service_add_decorate('MDS', spec,
-                                          self.rook_cluster.apply_filesystem)
+        # type: (ServiceSpec) -> str
+        return self.rook_cluster.apply_filesystem(spec)
 
+    @handle_orch_error
     def apply_rgw(self, spec):
-        # type: (RGWSpec) -> RookCompletion[str]
-        return self._service_add_decorate('RGW', spec,
-                                          self.rook_cluster.apply_objectstore)
+        # type: (RGWSpec) -> str
+        return self.rook_cluster.apply_objectstore(spec)
 
+    @handle_orch_error
     def apply_nfs(self, spec):
-        # type: (NFSServiceSpec) -> RookCompletion[str]
-        return self._service_add_decorate("NFS", spec,
-                                          self.rook_cluster.apply_nfsgw)
+        # type: (NFSServiceSpec) -> str
+        return self.rook_cluster.apply_nfsgw(spec)
 
-    def remove_daemons(self, names: List[str]) -> RookCompletion[List[str]]:
-        return write_completion(
-            lambda: self.rook_cluster.remove_pods(names),
-            "Removing daemons {}".format(','.join(names)),
-            mgr=self
-        )
+    @handle_orch_error
+    def remove_daemons(self, names: List[str]) -> List[str]:
+        return self.rook_cluster.remove_pods(names)
 
+    @handle_orch_error
     def create_osds(self, drive_group):
-        # type: (DriveGroupSpec) -> RookCompletion[str]
+        # type: (DriveGroupSpec) -> str
         """ Creates OSDs from a drive group specification.
 
         $: ceph orch osd create -i <dg.file>
@@ -538,41 +456,36 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         if drive_group.data_directories:
             targets += drive_group.data_directories
 
-        def execute(all_hosts_):
-            # type: (List[orchestrator.HostSpec]) -> orchestrator.Completion
-            matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts_)
-
-            assert len(matching_hosts) == 1
-
-            if not self.rook_cluster.node_exists(matching_hosts[0]):
-                raise RuntimeError("Node '{0}' is not in the Kubernetes "
-                                   "cluster".format(matching_hosts))
-
-            # Validate whether cluster CRD can accept individual OSD
-            # creations (i.e. not useAllDevices)
-            if not self.rook_cluster.can_create_osd():
-                raise RuntimeError("Rook cluster configuration does not "
-                                   "support OSD creation.")
-
-            return orchestrator.Completion.with_progress(
-                message="Creating OSD on {0}:{1}".format(
-                        matching_hosts,
-                        targets),
-                mgr=self,
-                on_complete=lambda _:self.rook_cluster.add_osds(drive_group, matching_hosts),
-                calc_percent=lambda: has_osds(matching_hosts)
-            )
+        all_hosts = raise_if_exception(self.get_hosts())
 
-        @deferred_read
+        matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts)
+
+        assert len(matching_hosts) == 1
+
+        if not self.rook_cluster.node_exists(matching_hosts[0]):
+            raise RuntimeError("Node '{0}' is not in the Kubernetes "
+                               "cluster".format(matching_hosts))
+
+        # Validate whether cluster CRD can accept individual OSD
+        # creations (i.e. not useAllDevices)
+        if not self.rook_cluster.can_create_osd():
+            raise RuntimeError("Rook cluster configuration does not "
+                               "support OSD creation.")
+
+        return self.rook_cluster.add_osds(drive_group, matching_hosts)
+
+        # TODO: this was the code to update the progress reference:
+        """
+        @handle_orch_error
         def has_osds(matching_hosts: List[str]) -> bool:
 
             # Find OSD pods on this host
             pod_osd_ids = set()
             pods = self.k8s.list_namespaced_pod(self._rook_env.namespace,
-                                                 label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
-                                                 field_selector="spec.nodeName={0}".format(
-                                                     matching_hosts[0]
-                                                 )).items
+                                                label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
+                                                field_selector="spec.nodeName={0}".format(
+                                                    matching_hosts[0]
+                                                )).items
             for p in pods:
                 pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
 
@@ -594,15 +507,8 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
                     ))
 
             return found is not None
+        """
 
-        c = self.get_hosts().then(execute)
-        return c
-
-    def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> RookCompletion:
-        return write_completion(
-            on_complete=lambda: self.rook_cluster.blink_light(
-                ident_fault, on, locs),
-            message="Switching <{}> identification light in {}".format(
-                on, ",".join(["{}:{}".format(loc.host, loc.dev) for loc in locs])),
-            mgr=self
-        )
+    @handle_orch_error
+    def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]:
+        return self.rook_cluster.blink_light(ident_fault, on, locs)