count += 1
return device_list
+class DefaultRemover():
+ def __init__(
+ self,
+ coreV1_api: 'client.CoreV1Api',
+ batchV1_api: 'client.BatchV1Api',
+ appsV1_api: 'client.AppsV1Api',
+ osd_ids: List[str],
+ replace_flag: bool,
+ force_flag: bool,
+ mon_command: Callable,
+ patch: Callable,
+ rook_env: 'RookEnv',
+ pods: KubernetesResource,
+ inventory: Dict[str, List[Device]]
+ ):
+ self.batchV1_api = batchV1_api
+ self.appsV1_api = appsV1_api
+ self.coreV1_api = coreV1_api
+
+ self.osd_ids = osd_ids
+ self.replace_flag = replace_flag
+ self.force_flag = force_flag
+
+ self.mon_command = mon_command
+
+ self.patch = patch
+ self.rook_env = rook_env
+
+ self.pods = pods
+ self.inventory = inventory
+
+ self.jobs: KubernetesResource = KubernetesResource(self.batchV1_api.list_namespaced_job, namespace='rook-ceph')
+ self.pvcs: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_persistent_volume_claim, namespace='rook-ceph')
+
+
+ def remove_device_sets(self) -> str:
+ self.to_remove: Dict[str, int] = {}
+ for pod in self.pods.items:
+ if (
+ hasattr(pod, 'metadata')
+ and hasattr(pod.metadata, 'labels')
+ and 'osd' in pod.metadata.labels
+ and pod.metadata.labels['osd'] in self.osd_ids
+ ):
+ if pod.metadata.labels['ceph.rook.io/DeviceSet'] in self.to_remove:
+ self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] + 1
+ else:
+ self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = 1
+ def _remove_osds(current_cluster, new_cluster):
+ # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
+ assert new_cluster.spec.storage is not None and new_cluster.spec.storage.storageClassDeviceSets is not None
+ for _set in new_cluster.spec.storage.storageClassDeviceSets:
+ if _set.name in self.to_remove:
+ if _set.count == self.to_remove[_set.name]:
+ new_cluster.spec.storage.storageClassDeviceSets.remove(_set)
+ else:
+ _set.count = _set.count - self.to_remove[_set.name]
+ return new_cluster
+ return self.patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _remove_osds)
+
+ def check_force(self) -> None:
+ if not self.force_flag:
+ safe_args = {'prefix': 'osd safe-to-destroy',
+ 'ids': [str(x) for x in self.osd_ids]}
+ ret, out, err = self.mon_command(safe_args)
+ if ret != 0:
+ log.error(f"error when checking if osds are safe to destroy, {err}")
+ raise RuntimeError(err)
+
+ def set_osds_down(self) -> None:
+ down_flag_args = {
+ 'prefix': 'osd down',
+ 'ids': [str(x) for x in self.osd_ids]
+ }
+ ret, out, err = self.mon_command(down_flag_args)
+ if ret != 0:
+ log.error(f"error setting down flags on OSDs, {err}")
+ raise RuntimeError(err)
+
+ def scale_deployments(self) -> None:
+ for osd_id in self.osd_ids:
+ self.appsV1_api.patch_namespaced_deployment_scale(namespace='rook-ceph', name='rook-ceph-osd-{}'.format(osd_id), body=client.V1Scale(
+ spec=client.V1ScaleSpec(
+ replicas=0
+ )
+ ))
+
+ def set_osds_out(self) -> None:
+ out_flag_args = {
+ 'prefix': 'osd out',
+ 'ids': [str(x) for x in self.osd_ids]
+ }
+ ret, out, err = self.mon_command(out_flag_args)
+ if ret != 0:
+ log.error(f"error setting down flags on OSDs, {err}")
+ raise RuntimeError(err)
+
+ def delete_deployments(self) -> None:
+ for osd_id in self.osd_ids:
+ self.appsV1_api.delete_namespaced_deployment(namespace='rook-ceph', name='rook-ceph-osd-{}'.format(osd_id), propagation_policy='Foreground')
+
+ def clean_up_prepare_jobs_and_pvc(self) -> None:
+ for job in self.jobs.items:
+ if job.metadata.labels['app'] == 'rook-ceph-osd-prepare' and job.metadata.labels['ceph.rook.io/DeviceSet'] in self.to_remove:
+ self.batchV1_api.delete_namespaced_job(name=job.metadata.name, namespace='rook-ceph', propagation_policy='Foreground')
+ self.coreV1_api.delete_namespaced_persistent_volume_claim(name=job.metadata.labels['ceph.rook.io/pvc'], namespace='rook-ceph', propagation_policy='Foreground')
+
+ def purge_osds(self) -> None:
+ for id in self.osd_ids:
+ purge_args = {
+ 'prefix': 'osd purge-actual',
+ 'id': int(id),
+ 'yes_i_really_mean_it': True
+ }
+ ret, out, err = self.mon_command(purge_args)
+ if ret != 0:
+ log.error(f"error setting down flags on OSDs, {err}")
+ raise RuntimeError(err)
+
+ def destroy_osds(self) -> None:
+ for id in self.osd_ids:
+ destroy_args = {
+ 'prefix': 'osd destroy-actual',
+ 'id': int(id),
+ 'yes_i_really_mean_it': True
+ }
+ ret, out, err = self.mon_command(destroy_args)
+ if ret != 0:
+ log.error(f"error setting down flags on OSDs, {err}")
+ raise RuntimeError(err)
+
+ def remove(self) -> str:
+ self.check_force()
+ remove_result = self.remove_device_sets()
+ self.scale_deployments()
+ self.set_osds_down()
+ self.set_osds_out()
+ self.delete_deployments()
+ self.clean_up_prepare_jobs_and_pvc()
+ if self.replace_flag:
+ self.destroy_osds()
+ else:
+ self.purge_osds()
+
+ return remove_result
+
class RookCluster(object):
# import of client.CoreV1Api must be optional at import time.
# Instead allow mgr/rook to be imported anyway.
for _, add_osd in self.drive_group_map.items():
self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, add_osd)
+ def remove_osds(self, osd_ids: List[str], replace: bool, force: bool, mon_command: Callable) -> str:
+ inventory = self.get_discovered_devices()
+ self.remover = DefaultRemover(
+ self.coreV1_api,
+ self.batchV1_api,
+ self.appsV1_api,
+ osd_ids,
+ replace,
+ force,
+ mon_command,
+ self._patch,
+ self.rook_env,
+ self.rook_pods,
+ inventory
+ )
+ return self.remover.remove()
+
def _patch(self, crd: Type, crd_name: str, cr_name: str, func: Callable[[CrdClassT, CrdClassT], CrdClassT]) -> str:
current_json = self.rook_api_get(
"{}/{}".format(crd_name, cr_name)