]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/rook: Adapt to new orch interface
authorSebastian Wagner <sebastian.wagner@suse.com>
Mon, 8 Feb 2021 00:47:42 +0000 (01:47 +0100)
committerSebastian Wagner <sebastian.wagner@suse.com>
Tue, 9 Mar 2021 14:29:32 +0000 (15:29 +0100)
Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
(cherry picked from commit 42d5c8991a9b806b960623386c95128515468c6d)

Conflicts:
src/pybind/mgr/rook/module.py

src/pybind/mgr/rook/module.py
src/pybind/mgr/rook/rook_cluster.py

index 45c0097dbd3c859d01824b9ea58888bdad5e8d03..cd9f578a180475f9bf47c6b88047bb5c7e624b9b 100644 (file)
@@ -32,41 +32,11 @@ except ImportError:
 
 from mgr_module import MgrModule, Option
 import orchestrator
+from orchestrator import handle_orch_error, OrchResult, raise_if_exception
 
 from .rook_cluster import RookCluster
 
 
-class RookCompletion(orchestrator.Completion):
-    def evaluate(self):
-        self.finalize(None)
-
-
-def deferred_read(f):
-    # type: (Callable) -> Callable[..., RookCompletion]
-    """
-    Decorator to make RookOrchestrator methods return
-    a completion object that executes themselves.
-    """
-
-    @functools.wraps(f)
-    def wrapper(*args, **kwargs):
-        return RookCompletion(on_complete=lambda _: f(*args, **kwargs))
-
-    return wrapper
-
-
-def write_completion(on_complete,  # type: Callable
-                     message,  # type: str
-                     mgr,
-                     calc_percent=None  # type: Optional[Callable[[], RookCompletion]]
-                     ):
-    # type: (...) -> RookCompletion
-    return RookCompletion.with_progress(
-        message=message,
-        mgr=mgr,
-        on_complete=lambda _: on_complete(),
-        calc_percent=calc_percent,
-    )
 
 
 class RookEnv(object):
@@ -102,13 +72,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         # TODO: configure k8s API addr instead of assuming local
     ]
 
-    def process(self, completions: List[RookCompletion]) -> None:  # type: ignore
-        if completions:
-            self.log.info("process: completions={0}".format(orchestrator.pretty_print(completions)))
-
-            for p in completions:
-                p.evaluate()
-
     @staticmethod
     def can_run():
         if not kubernetes_imported:
@@ -141,9 +104,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         self._shutdown = threading.Event()
 
-        self.all_progress_references = list()  # type: List[orchestrator.ProgressReference]
-
-    def shutdown(self):
+    def shutdown(self) -> None:
         self._shutdown.set()
 
     @property
@@ -196,22 +157,9 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         self._initialized.set()
 
         while not self._shutdown.is_set():
-            # XXX hack (or is it?) to kick all completions periodically,
-            # in case we had a caller that wait()'ed on them long enough
-            # to get persistence but not long enough to get completion
-
-            self.all_progress_references = [p for p in self.all_progress_references if not p.effective]
-            for p in self.all_progress_references:
-                p.update()
-
             self._shutdown.wait(5)
 
-    def cancel_completions(self):
-        for p in self.all_progress_references:
-            p.fail()
-        self.all_progress_references.clear()
-
-    @deferred_read
+    @handle_orch_error
     def get_inventory(self, host_filter=None, refresh=False):
         host_list = None
         if host_filter and host_filter.hosts:
@@ -236,7 +184,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
                         sys_api = dict(
                             rotational = '1' if d['rotational'] else '0',
                             size = d['size']
-                            ),
+                        ),
                         available = False,
                         rejected_reasons=['device data coming from ceph-volume not provided'],
                     ))
@@ -245,12 +193,12 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         return result
 
-    @deferred_read
+    @handle_orch_error
     def get_hosts(self):
         # type: () -> List[orchestrator.HostSpec]
         return [orchestrator.HostSpec(n) for n in self.rook_cluster.get_node_names()]
 
-    @deferred_read
+    @handle_orch_error
     def describe_service(self, service_type=None, service_name=None,
                          refresh=False):
         now = datetime.datetime.utcnow()
@@ -265,26 +213,26 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         spec = {}
         if service_type == 'mon' or service_type is None:
             spec['mon'] = orchestrator.ServiceDescription(
-                    spec=ServiceSpec(
-                        'mon',
-                        placement=PlacementSpec(
-                            count=cl['spec'].get('mon', {}).get('count', 1),
-                            ),
-                        ),
-                    size=cl['spec'].get('mon', {}).get('count', 1),
-                    container_image_name=image_name,
-                    last_refresh=now,
-                    )
+                spec=ServiceSpec(
+                    'mon',
+                    placement=PlacementSpec(
+                        count=cl['spec'].get('mon', {}).get('count', 1),
+                    ),
+                ),
+                size=cl['spec'].get('mon', {}).get('count', 1),
+                container_image_name=image_name,
+                last_refresh=now,
+            )
         if service_type == 'mgr' or service_type is None:
             spec['mgr'] = orchestrator.ServiceDescription(
-                    spec=ServiceSpec(
-                        'mgr',
-                        placement=PlacementSpec.from_string('count:1'),
-                        ),
-                    size=1,
-                    container_image_name=image_name,
-                    last_refresh=now,
-                    )
+                spec=ServiceSpec(
+                    'mgr',
+                    placement=PlacementSpec.from_string('count:1'),
+                ),
+                size=1,
+                container_image_name=image_name,
+                last_refresh=now,
+            )
         if not cl['spec'].get('crashCollector', {}).get('disable', False):
             spec['crash'] = orchestrator.ServiceDescription(
                 spec=ServiceSpec(
@@ -299,7 +247,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         if service_type == 'mds' or service_type is None:
             # CephFilesystems
             all_fs = self.rook_cluster.rook_api_get(
-                    "cephfilesystems/")
+                "cephfilesystems/")
             self.log.debug('CephFilesystems %s' % all_fs)
             for fs in all_fs.get('items', []):
                 svc = 'mds.' + fs['metadata']['name']
@@ -311,20 +259,20 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
                 if fs['spec'].get('metadataServer', {}).get('activeStandby', False):
                     total_mds = active * 2
                     spec[svc] = orchestrator.ServiceDescription(
-                            spec=ServiceSpec(
-                                service_type='mds',
-                                service_id=fs['metadata']['name'],
-                                placement=PlacementSpec(count=active),
-                                ),
-                            size=total_mds,
-                            container_image_name=image_name,
-                            last_refresh=now,
-                            )
+                        spec=ServiceSpec(
+                            service_type='mds',
+                            service_id=fs['metadata']['name'],
+                            placement=PlacementSpec(count=active),
+                        ),
+                        size=total_mds,
+                        container_image_name=image_name,
+                        last_refresh=now,
+                    )
 
         if service_type == 'rgw' or service_type is None:
             # CephObjectstores
             all_zones = self.rook_cluster.rook_api_get(
-                    "cephobjectstores/")
+                "cephobjectstores/")
             self.log.debug('CephObjectstores %s' % all_zones)
             for zone in all_zones.get('items', []):
                 rgw_realm = zone['metadata']['name']
@@ -340,23 +288,23 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
                     ssl = False
                     port = zone['spec']['gateway']['port'] or 80
                 spec[svc] = orchestrator.ServiceDescription(
-                        spec=RGWSpec(
-                            service_id=rgw_realm + '.' + rgw_zone,
-                            rgw_realm=rgw_realm,
-                            rgw_zone=rgw_zone,
-                            ssl=ssl,
-                            rgw_frontend_port=port,
-                            placement=PlacementSpec(count=active),
-                            ),
-                        size=active,
-                        container_image_name=image_name,
-                        last_refresh=now,
-                        )
+                    spec=RGWSpec(
+                        service_id=rgw_realm + '.' + rgw_zone,
+                        rgw_realm=rgw_realm,
+                        rgw_zone=rgw_zone,
+                        ssl=ssl,
+                        rgw_frontend_port=port,
+                        placement=PlacementSpec(count=active),
+                    ),
+                    size=active,
+                    container_image_name=image_name,
+                    last_refresh=now,
+                )
 
         if service_type == 'nfs' or service_type is None:
             # CephNFSes
             all_nfs = self.rook_cluster.rook_api_get(
-                    "cephnfses/")
+                "cephnfses/")
             self.log.warning('CephNFS %s' % all_nfs)
             for nfs in all_nfs.get('items', []):
                 nfs_name = nfs['metadata']['name']
@@ -365,15 +313,15 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
                     continue
                 active = nfs['spec'].get('server', {}).get('active')
                 spec[svc] = orchestrator.ServiceDescription(
-                        spec=NFSServiceSpec(
-                            service_id=nfs_name,
-                            pool=nfs['spec']['rados']['pool'],
-                            namespace=nfs['spec']['rados'].get('namespace', None),
-                            placement=PlacementSpec(count=active),
-                            ),
-                        size=active,
-                        last_refresh=now,
-                        )
+                    spec=NFSServiceSpec(
+                        service_id=nfs_name,
+                        pool=nfs['spec']['rados']['pool'],
+                        namespace=nfs['spec']['rados'].get('namespace', None),
+                        placement=PlacementSpec(count=active),
+                    ),
+                    size=active,
+                    last_refresh=now,
+                )
 
         for dd in self._list_daemons():
             if dd.service_name() not in spec:
@@ -391,7 +339,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         return [v for k, v in spec.items()]
 
-    @deferred_read
+    @handle_orch_error
     def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None,
                      refresh=False):
         return self._list_daemons(service_name=service_name,
@@ -440,71 +388,48 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         return result
 
-    def _service_add_decorate(self, typename, spec, func):
-        return write_completion(
-            on_complete=lambda : func(spec),
-            message="Creating {} services for {}".format(typename, spec.service_id),
-            mgr=self
-        )
-
-    def _service_rm_decorate(self, typename, name, func):
-        return write_completion(
-            on_complete=lambda : func(),
-            message="Removing {} services for {}".format(typename, name),
-            mgr=self
-        )
-
-    def remove_service(self, service_name):
+    @handle_orch_error
+    def remove_service(self, service_name: str) -> str:
         service_type, service_name = service_name.split('.', 1)
         if service_type == 'mds':
-            return self._service_rm_decorate(
-                'MDS', service_name, lambda: self.rook_cluster.rm_service(
-                    'cephfilesystems', service_name)
-            )
+            return self.rook_cluster.rm_service('cephfilesystems', service_name)
         elif service_type == 'rgw':
-            return self._service_rm_decorate(
-                'RGW', service_name, lambda: self.rook_cluster.rm_service('cephobjectstores', service_name)
-            )
+            return self.rook_cluster.rm_service('cephobjectstores', service_name)
         elif service_type == 'nfs':
-            return self._service_rm_decorate(
-                'NFS', service_name, lambda: self.rook_cluster.rm_service('cephnfses', service_name)
-            )
+            return self.rook_cluster.rm_service('cephnfses', service_name)
+        else:
+            raise orchestrator.OrchestratorError(f'Service type {service_type} not supported')
 
+    @handle_orch_error
     def apply_mon(self, spec):
-        # type: (ServiceSpec) -> RookCompletion
+        # type: (ServiceSpec) -> str
         if spec.placement.hosts or spec.placement.label:
             raise RuntimeError("Host list or label is not supported by rook.")
 
-        return write_completion(
-            lambda: self.rook_cluster.update_mon_count(spec.placement.count),
-            "Updating mon count to {0}".format(spec.placement.count),
-            mgr=self
-        )
+        return self.rook_cluster.update_mon_count(spec.placement.count)
 
+    @handle_orch_error
     def apply_mds(self, spec):
-        # type: (ServiceSpec) -> RookCompletion
-        return self._service_add_decorate('MDS', spec,
-                                          self.rook_cluster.apply_filesystem)
+        # type: (ServiceSpec) -> str
+        return self.rook_cluster.apply_filesystem(spec)
 
+    @handle_orch_error
     def apply_rgw(self, spec):
-        # type: (RGWSpec) -> RookCompletion
-        return self._service_add_decorate('RGW', spec,
-                                          self.rook_cluster.apply_objectstore)
+        # type: (RGWSpec) -> str
+        return self.rook_cluster.apply_objectstore(spec)
 
+    @handle_orch_error
     def apply_nfs(self, spec):
-        # type: (NFSServiceSpec) -> RookCompletion
-        return self._service_add_decorate("NFS", spec,
-                                          self.rook_cluster.apply_nfsgw)
+        # type: (NFSServiceSpec) -> str
+        return self.rook_cluster.apply_nfsgw(spec)
 
-    def remove_daemons(self, names):
-        return write_completion(
-            lambda: self.rook_cluster.remove_pods(names),
-            "Removing daemons {}".format(','.join(names)),
-            mgr=self
-        )
+    @handle_orch_error
+    def remove_daemons(self, names: List[str]) -> List[str]:
+        return self.rook_cluster.remove_pods(names)
 
+    @handle_orch_error
     def create_osds(self, drive_group):
-        # type: (DriveGroupSpec) -> RookCompletion
+        # type: (DriveGroupSpec) -> str
         """ Creates OSDs from a drive group specification.
 
         $: ceph orch osd create -i <dg.file>
@@ -518,41 +443,36 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         if drive_group.data_directories:
             targets += drive_group.data_directories
 
-        def execute(all_hosts_):
-            # type: (List[orchestrator.HostSpec]) -> orchestrator.Completion
-            matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts_)
-
-            assert len(matching_hosts) == 1
-
-            if not self.rook_cluster.node_exists(matching_hosts[0]):
-                raise RuntimeError("Node '{0}' is not in the Kubernetes "
-                                   "cluster".format(matching_hosts))
-
-            # Validate whether cluster CRD can accept individual OSD
-            # creations (i.e. not useAllDevices)
-            if not self.rook_cluster.can_create_osd():
-                raise RuntimeError("Rook cluster configuration does not "
-                                   "support OSD creation.")
-
-            return orchestrator.Completion.with_progress(
-                message="Creating OSD on {0}:{1}".format(
-                        matching_hosts,
-                        targets),
-                mgr=self,
-                on_complete=lambda _:self.rook_cluster.add_osds(drive_group, matching_hosts),
-                calc_percent=lambda: has_osds(matching_hosts)
-            )
+        all_hosts = raise_if_exception(self.get_hosts())
 
-        @deferred_read
-        def has_osds(matching_hosts):
+        matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts)
+
+        assert len(matching_hosts) == 1
+
+        if not self.rook_cluster.node_exists(matching_hosts[0]):
+            raise RuntimeError("Node '{0}' is not in the Kubernetes "
+                               "cluster".format(matching_hosts))
+
+        # Validate whether cluster CRD can accept individual OSD
+        # creations (i.e. not useAllDevices)
+        if not self.rook_cluster.can_create_osd():
+            raise RuntimeError("Rook cluster configuration does not "
+                               "support OSD creation.")
+
+        return self.rook_cluster.add_osds(drive_group, matching_hosts)
+
+        # TODO: this was the code to update the progress reference:
+        """
+        @handle_orch_error
+        def has_osds(matching_hosts: List[str]) -> bool:
 
             # Find OSD pods on this host
             pod_osd_ids = set()
             pods = self.k8s.list_namespaced_pod(self._rook_env.namespace,
-                                                 label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
-                                                 field_selector="spec.nodeName={0}".format(
-                                                     matching_hosts[0]
-                                                 )).items
+                                                label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
+                                                field_selector="spec.nodeName={0}".format(
+                                                    matching_hosts[0]
+                                                )).items
             for p in pods:
                 pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
 
@@ -574,15 +494,8 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
                     ))
 
             return found is not None
+        """
 
-        c = self.get_hosts().then(execute)
-        return c
-
-    def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> RookCompletion:
-        return write_completion(
-            on_complete=lambda: self.rook_cluster.blink_light(
-                ident_fault, on, locs),
-            message="Switching <{}> identification light in {}".format(
-                on, ",".join(["{}:{}".format(loc.host, loc.dev) for loc in locs])),
-            mgr=self
-        )
+    @handle_orch_error
+    def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]:
+        return self.rook_cluster.blink_light(ident_fault, on, locs)
index c842b3579cadabcb05f67d449645a2cce2189cec..197e1318dd1a292380c954d38535b4a935e24065 100644 (file)
@@ -389,7 +389,7 @@ class RookCluster(object):
             else:
                 raise
 
-    def apply_filesystem(self, spec: ServiceSpec) -> None:
+    def apply_filesystem(self, spec: ServiceSpec) -> str:
         # TODO use spec.placement
         # TODO warn if spec.extended has entries we don't kow how
         #      to action.
@@ -456,7 +456,7 @@ class RookCluster(object):
             cos.CephObjectStore, 'cephobjectstores', name,
             _update_zone, _create_zone)
 
-    def apply_nfsgw(self, spec: NFSServiceSpec) -> None:
+    def apply_nfsgw(self, spec: NFSServiceSpec) -> str:
         # TODO use spec.placement
         # TODO warn if spec.extended has entries we don't kow how
         #      to action.