]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/rook: implement OSD removal in the Rook Orchestrator
authorJoseph Sawaya <jsawaya@redhat.com>
Wed, 28 Jul 2021 17:32:24 +0000 (13:32 -0400)
committerJoseph Sawaya <jsawaya@redhat.com>
Thu, 19 Aug 2021 20:01:51 +0000 (16:01 -0400)
This commit implements OSD removal for the Rook Orchestrator
by implmenting the remove_osds methods in RookOrchestrator
and RookCluster.

This commit creates the DefaultRemover class that handles
making k8s API calls, patching the cluster CR and making
mon_command calls.

Signed-off-by: Joseph Sawaya <jsawaya@redhat.com>
src/pybind/mgr/rook/module.py
src/pybind/mgr/rook/rook_cluster.py

index ecbb591dfb0754e9b04ac912d044b1b89e883da5..bf6f673b302d9f435bc380b38b3fa0cf995834c7 100644 (file)
@@ -478,6 +478,12 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
                                 "support OSD creation.")
             result_list.append(self.rook_cluster.add_osds(drive_group, matching_hosts))
         return OrchResult(result_list)
+
+    def remove_osds(self, osd_ids: List[str], replace: bool = False, force: bool = False) -> OrchResult[str]:
+        assert self._rook_cluster is not None
+        res = self._rook_cluster.remove_osds(osd_ids, replace, force, self.mon_command)
+        return OrchResult(res)
+
     """
     @handle_orch_error
     def create_osds(self, drive_group):
index 18bf853ff390fc02bf2313db95aac510ef1d8fed..89659cee473751a880e3ccf8b294b8e24b242f20 100644 (file)
@@ -472,6 +472,152 @@ class LSOCreator(DefaultCreator):
                             count += 1
         return device_list
 
+class DefaultRemover():
+    def __init__(
+        self,
+        coreV1_api: 'client.CoreV1Api', 
+        batchV1_api: 'client.BatchV1Api', 
+        appsV1_api: 'client.AppsV1Api', 
+        osd_ids: List[str], 
+        replace_flag: bool, 
+        force_flag: bool, 
+        mon_command: Callable, 
+        patch: Callable, 
+        rook_env: 'RookEnv',
+        pods: KubernetesResource, 
+        inventory: Dict[str, List[Device]]
+    ):
+        self.batchV1_api = batchV1_api
+        self.appsV1_api = appsV1_api
+        self.coreV1_api = coreV1_api
+
+        self.osd_ids = osd_ids
+        self.replace_flag = replace_flag
+        self.force_flag = force_flag
+
+        self.mon_command = mon_command
+
+        self.patch = patch
+        self.rook_env = rook_env
+
+        self.pods = pods
+        self.inventory = inventory
+
+        self.jobs: KubernetesResource = KubernetesResource(self.batchV1_api.list_namespaced_job, namespace='rook-ceph')
+        self.pvcs: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_persistent_volume_claim, namespace='rook-ceph')
+
+
+    def remove_device_sets(self) -> str:
+        self.to_remove: Dict[str, int] = {}
+        for pod in self.pods.items:
+            if (
+                hasattr(pod, 'metadata') 
+                and hasattr(pod.metadata, 'labels') 
+                and 'osd' in pod.metadata.labels 
+                and pod.metadata.labels['osd'] in self.osd_ids
+            ):
+                if pod.metadata.labels['ceph.rook.io/DeviceSet'] in self.to_remove:
+                    self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] + 1
+                else:
+                    self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = 1
+        def _remove_osds(current_cluster, new_cluster):
+            # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
+            assert new_cluster.spec.storage is not None and new_cluster.spec.storage.storageClassDeviceSets is not None
+            for _set in new_cluster.spec.storage.storageClassDeviceSets:
+                    if _set.name in self.to_remove:
+                        if _set.count == self.to_remove[_set.name]:
+                            new_cluster.spec.storage.storageClassDeviceSets.remove(_set)
+                        else:
+                            _set.count = _set.count - self.to_remove[_set.name]
+            return new_cluster
+        return self.patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _remove_osds)
+
+    def check_force(self) -> None:
+        if not self.force_flag:
+            safe_args = {'prefix': 'osd safe-to-destroy',
+                        'ids': [str(x) for x in self.osd_ids]}
+            ret, out, err = self.mon_command(safe_args)
+            if ret != 0:
+                log.error(f"error when checking if osds are safe to destroy, {err}")
+                raise RuntimeError(err)
+
+    def set_osds_down(self) -> None:
+        down_flag_args = {
+            'prefix': 'osd down',
+            'ids': [str(x) for x in self.osd_ids]
+        }
+        ret, out, err = self.mon_command(down_flag_args)
+        if ret != 0:
+            log.error(f"error setting down flags on OSDs, {err}")
+            raise RuntimeError(err)
+
+    def scale_deployments(self) -> None:
+        for osd_id in self.osd_ids:
+            self.appsV1_api.patch_namespaced_deployment_scale(namespace='rook-ceph', name='rook-ceph-osd-{}'.format(osd_id), body=client.V1Scale(
+                spec=client.V1ScaleSpec(
+                    replicas=0
+                )
+            ))
+
+    def set_osds_out(self) -> None:
+        out_flag_args = {
+            'prefix': 'osd out',
+            'ids': [str(x) for x in self.osd_ids]
+        }
+        ret, out, err = self.mon_command(out_flag_args)
+        if ret != 0:
+            log.error(f"error setting down flags on OSDs, {err}")
+            raise RuntimeError(err)
+            
+    def delete_deployments(self) -> None:
+        for osd_id in self.osd_ids:
+            self.appsV1_api.delete_namespaced_deployment(namespace='rook-ceph', name='rook-ceph-osd-{}'.format(osd_id), propagation_policy='Foreground')
+
+    def clean_up_prepare_jobs_and_pvc(self) -> None:
+        for job in self.jobs.items:
+            if job.metadata.labels['app'] == 'rook-ceph-osd-prepare' and job.metadata.labels['ceph.rook.io/DeviceSet'] in self.to_remove:
+                self.batchV1_api.delete_namespaced_job(name=job.metadata.name, namespace='rook-ceph', propagation_policy='Foreground')
+                self.coreV1_api.delete_namespaced_persistent_volume_claim(name=job.metadata.labels['ceph.rook.io/pvc'], namespace='rook-ceph', propagation_policy='Foreground')
+
+    def purge_osds(self) -> None:
+        for id in self.osd_ids:
+            purge_args = {
+                'prefix': 'osd purge-actual',
+                'id': int(id),
+                'yes_i_really_mean_it': True
+            }
+            ret, out, err = self.mon_command(purge_args)
+            if ret != 0:
+                log.error(f"error setting down flags on OSDs, {err}")
+                raise RuntimeError(err)
+
+    def destroy_osds(self) -> None:
+        for id in self.osd_ids:
+            destroy_args = {
+                'prefix': 'osd destroy-actual',
+                'id': int(id),
+                'yes_i_really_mean_it': True
+            }
+            ret, out, err = self.mon_command(destroy_args)
+            if ret != 0:
+                log.error(f"error setting down flags on OSDs, {err}")
+                raise RuntimeError(err)
+
+    def remove(self) -> str:
+        self.check_force()
+        remove_result = self.remove_device_sets()
+        self.scale_deployments()
+        self.set_osds_down()
+        self.set_osds_out()
+        self.delete_deployments()
+        self.clean_up_prepare_jobs_and_pvc()
+        if self.replace_flag:
+            self.destroy_osds()
+        else:
+            self.purge_osds()
+        
+        return remove_result
+
 class RookCluster(object):
     # import of client.CoreV1Api must be optional at import time.
     # Instead allow mgr/rook to be imported anyway.
@@ -865,6 +1011,23 @@ class RookCluster(object):
                 for _, add_osd in self.drive_group_map.items():
                     self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, add_osd)
 
+    def remove_osds(self, osd_ids: List[str], replace: bool, force: bool, mon_command: Callable) -> str:
+        inventory = self.get_discovered_devices()
+        self.remover = DefaultRemover(
+            self.coreV1_api,
+            self.batchV1_api, 
+            self.appsV1_api, 
+            osd_ids, 
+            replace, 
+            force, 
+            mon_command, 
+            self._patch, 
+            self.rook_env,
+            self.rook_pods, 
+            inventory
+        )
+        return self.remover.remove()
+
     def _patch(self, crd: Type, crd_name: str, cr_name: str, func: Callable[[CrdClassT, CrdClassT], CrdClassT]) -> str:
         current_json = self.rook_api_get(
             "{}/{}".format(crd_name, cr_name)