From cf494274b02bbe6d13d41cae485600c3c0d5b7a1 Mon Sep 17 00:00:00 2001 From: Joseph Sawaya Date: Wed, 28 Jul 2021 13:32:24 -0400 Subject: [PATCH] mgr/rook: implement OSD removal in the Rook Orchestrator This commit implements OSD removal for the Rook Orchestrator by implmenting the remove_osds methods in RookOrchestrator and RookCluster. This commit creates the DefaultRemover class that handles making k8s API calls, patching the cluster CR and making mon_command calls. Signed-off-by: Joseph Sawaya --- src/pybind/mgr/rook/module.py | 6 + src/pybind/mgr/rook/rook_cluster.py | 163 ++++++++++++++++++++++++++++ 2 files changed, 169 insertions(+) diff --git a/src/pybind/mgr/rook/module.py b/src/pybind/mgr/rook/module.py index ecbb591dfb075..bf6f673b302d9 100644 --- a/src/pybind/mgr/rook/module.py +++ b/src/pybind/mgr/rook/module.py @@ -478,6 +478,12 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): "support OSD creation.") result_list.append(self.rook_cluster.add_osds(drive_group, matching_hosts)) return OrchResult(result_list) + + def remove_osds(self, osd_ids: List[str], replace: bool = False, force: bool = False) -> OrchResult[str]: + assert self._rook_cluster is not None + res = self._rook_cluster.remove_osds(osd_ids, replace, force, self.mon_command) + return OrchResult(res) + """ @handle_orch_error def create_osds(self, drive_group): diff --git a/src/pybind/mgr/rook/rook_cluster.py b/src/pybind/mgr/rook/rook_cluster.py index 18bf853ff390f..89659cee47375 100644 --- a/src/pybind/mgr/rook/rook_cluster.py +++ b/src/pybind/mgr/rook/rook_cluster.py @@ -472,6 +472,152 @@ class LSOCreator(DefaultCreator): count += 1 return device_list +class DefaultRemover(): + def __init__( + self, + coreV1_api: 'client.CoreV1Api', + batchV1_api: 'client.BatchV1Api', + appsV1_api: 'client.AppsV1Api', + osd_ids: List[str], + replace_flag: bool, + force_flag: bool, + mon_command: Callable, + patch: Callable, + rook_env: 'RookEnv', + pods: KubernetesResource, + inventory: Dict[str, List[Device]] + ): + self.batchV1_api = batchV1_api + self.appsV1_api = appsV1_api + self.coreV1_api = coreV1_api + + self.osd_ids = osd_ids + self.replace_flag = replace_flag + self.force_flag = force_flag + + self.mon_command = mon_command + + self.patch = patch + self.rook_env = rook_env + + self.pods = pods + self.inventory = inventory + + self.jobs: KubernetesResource = KubernetesResource(self.batchV1_api.list_namespaced_job, namespace='rook-ceph') + self.pvcs: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_persistent_volume_claim, namespace='rook-ceph') + + + def remove_device_sets(self) -> str: + self.to_remove: Dict[str, int] = {} + for pod in self.pods.items: + if ( + hasattr(pod, 'metadata') + and hasattr(pod.metadata, 'labels') + and 'osd' in pod.metadata.labels + and pod.metadata.labels['osd'] in self.osd_ids + ): + if pod.metadata.labels['ceph.rook.io/DeviceSet'] in self.to_remove: + self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] + 1 + else: + self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = 1 + def _remove_osds(current_cluster, new_cluster): + # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster + assert new_cluster.spec.storage is not None and new_cluster.spec.storage.storageClassDeviceSets is not None + for _set in new_cluster.spec.storage.storageClassDeviceSets: + if _set.name in self.to_remove: + if _set.count == self.to_remove[_set.name]: + new_cluster.spec.storage.storageClassDeviceSets.remove(_set) + else: + _set.count = _set.count - self.to_remove[_set.name] + return new_cluster + return self.patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _remove_osds) + + def check_force(self) -> None: + if not self.force_flag: + safe_args = {'prefix': 'osd safe-to-destroy', + 'ids': [str(x) for x in self.osd_ids]} + ret, out, err = self.mon_command(safe_args) + if ret != 0: + log.error(f"error when checking if osds are safe to destroy, {err}") + raise RuntimeError(err) + + def set_osds_down(self) -> None: + down_flag_args = { + 'prefix': 'osd down', + 'ids': [str(x) for x in self.osd_ids] + } + ret, out, err = self.mon_command(down_flag_args) + if ret != 0: + log.error(f"error setting down flags on OSDs, {err}") + raise RuntimeError(err) + + def scale_deployments(self) -> None: + for osd_id in self.osd_ids: + self.appsV1_api.patch_namespaced_deployment_scale(namespace='rook-ceph', name='rook-ceph-osd-{}'.format(osd_id), body=client.V1Scale( + spec=client.V1ScaleSpec( + replicas=0 + ) + )) + + def set_osds_out(self) -> None: + out_flag_args = { + 'prefix': 'osd out', + 'ids': [str(x) for x in self.osd_ids] + } + ret, out, err = self.mon_command(out_flag_args) + if ret != 0: + log.error(f"error setting down flags on OSDs, {err}") + raise RuntimeError(err) + + def delete_deployments(self) -> None: + for osd_id in self.osd_ids: + self.appsV1_api.delete_namespaced_deployment(namespace='rook-ceph', name='rook-ceph-osd-{}'.format(osd_id), propagation_policy='Foreground') + + def clean_up_prepare_jobs_and_pvc(self) -> None: + for job in self.jobs.items: + if job.metadata.labels['app'] == 'rook-ceph-osd-prepare' and job.metadata.labels['ceph.rook.io/DeviceSet'] in self.to_remove: + self.batchV1_api.delete_namespaced_job(name=job.metadata.name, namespace='rook-ceph', propagation_policy='Foreground') + self.coreV1_api.delete_namespaced_persistent_volume_claim(name=job.metadata.labels['ceph.rook.io/pvc'], namespace='rook-ceph', propagation_policy='Foreground') + + def purge_osds(self) -> None: + for id in self.osd_ids: + purge_args = { + 'prefix': 'osd purge-actual', + 'id': int(id), + 'yes_i_really_mean_it': True + } + ret, out, err = self.mon_command(purge_args) + if ret != 0: + log.error(f"error setting down flags on OSDs, {err}") + raise RuntimeError(err) + + def destroy_osds(self) -> None: + for id in self.osd_ids: + destroy_args = { + 'prefix': 'osd destroy-actual', + 'id': int(id), + 'yes_i_really_mean_it': True + } + ret, out, err = self.mon_command(destroy_args) + if ret != 0: + log.error(f"error setting down flags on OSDs, {err}") + raise RuntimeError(err) + + def remove(self) -> str: + self.check_force() + remove_result = self.remove_device_sets() + self.scale_deployments() + self.set_osds_down() + self.set_osds_out() + self.delete_deployments() + self.clean_up_prepare_jobs_and_pvc() + if self.replace_flag: + self.destroy_osds() + else: + self.purge_osds() + + return remove_result + class RookCluster(object): # import of client.CoreV1Api must be optional at import time. # Instead allow mgr/rook to be imported anyway. @@ -865,6 +1011,23 @@ class RookCluster(object): for _, add_osd in self.drive_group_map.items(): self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, add_osd) + def remove_osds(self, osd_ids: List[str], replace: bool, force: bool, mon_command: Callable) -> str: + inventory = self.get_discovered_devices() + self.remover = DefaultRemover( + self.coreV1_api, + self.batchV1_api, + self.appsV1_api, + osd_ids, + replace, + force, + mon_command, + self._patch, + self.rook_env, + self.rook_pods, + inventory + ) + return self.remover.remove() + def _patch(self, crd: Type, crd_name: str, cr_name: str, func: Callable[[CrdClassT, CrdClassT], CrdClassT]) -> str: current_json = self.rook_api_get( "{}/{}".format(crd_name, cr_name) -- 2.39.5