From: Redouane Kachach Date: Thu, 22 Feb 2024 08:48:06 +0000 (+0100) Subject: mgr/rook: removing all the code related to OSDs creation/removal X-Git-Tag: v18.2.4~257^2~3 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=53dcbddb9429aacd01dca53701f4476a1eac2180;p=ceph.git mgr/rook: removing all the code related to OSDs creation/removal Fixes: https://tracker.ceph.com/issues/64211 Signed-off-by: Redouane Kachach --- diff --git a/src/pybind/mgr/rook/module.py b/src/pybind/mgr/rook/module.py index ca2d168db636e..91099710c4b35 100644 --- a/src/pybind/mgr/rook/module.py +++ b/src/pybind/mgr/rook/module.py @@ -82,12 +82,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): default='local', desc='storage class name for LSO-discovered PVs', ), - Option( - 'drive_group_interval', - type='float', - default=300.0, - desc='interval in seconds between re-application of applied drive_groups', - ), ] @staticmethod @@ -126,9 +120,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): self.config_notify() if TYPE_CHECKING: self.storage_class = 'foo' - self.drive_group_interval = 10.0 - self._load_drive_groups() self._shutdown = threading.Event() def config_notify(self) -> None: @@ -144,7 +136,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): self.log.debug(' mgr option %s = %s', opt['name'], getattr(self, opt['name'])) # type: ignore assert isinstance(self.storage_class, str) - assert isinstance(self.drive_group_interval, float) if self._rook_cluster: self._rook_cluster.storage_class_name = self.storage_class @@ -211,10 +202,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): self._initialized.set() self.config_notify() - while not self._shutdown.is_set(): - self._apply_drivegroups(list(self._drive_group_map.values())) - self._shutdown.wait(self.drive_group_interval) - @handle_orch_error def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]: host_list = None @@ -415,15 +402,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): running=sum_running_pods('osd') ) - # drivegroups - for name, dg in self._drive_group_map.items(): - spec[f'osd.{name}'] = orchestrator.ServiceDescription( - spec=dg, - last_refresh=now, - size=0, - running=0, - ) - if service_type == 'rbd-mirror' or service_type is None: # rbd-mirrors all_mirrors = self.rook_cluster.get_resource("cephrbdmirrors") @@ -576,9 +554,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): elif service_type == 'rbd-mirror': return self.rook_cluster.rm_service('cephrbdmirrors', service_id) elif service_type == 'osd': - if service_id in self._drive_group_map: - del self._drive_group_map[service_id] - self._save_drive_groups() return f'Removed {service_name}' elif service_type == 'ingress': self.log.info("{0} service '{1}' does not exist".format('ingress', service_id)) @@ -634,134 +609,11 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): def remove_daemons(self, names: List[str]) -> List[str]: return self.rook_cluster.remove_pods(names) - def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]: - for drive_group in specs: - self._drive_group_map[str(drive_group.service_id)] = drive_group - self._save_drive_groups() - return OrchResult(self._apply_drivegroups(specs)) - - def _apply_drivegroups(self, ls: List[DriveGroupSpec]) -> List[str]: - all_hosts = raise_if_exception(self.get_hosts()) - result_list: List[str] = [] - for drive_group in ls: - matching_hosts = drive_group.placement.filter_matching_hosts( - lambda label=None, as_hostspec=None: all_hosts - ) - - if not self.rook_cluster.node_exists(matching_hosts[0]): - raise RuntimeError("Node '{0}' is not in the Kubernetes " - "cluster".format(matching_hosts)) - - # Validate whether cluster CRD can accept individual OSD - # creations (i.e. not useAllDevices) - if not self.rook_cluster.can_create_osd(): - raise RuntimeError("Rook cluster configuration does not " - "support OSD creation.") - result_list.append(self.rook_cluster.add_osds(drive_group, matching_hosts)) - return result_list - - def _load_drive_groups(self) -> None: - stored_drive_group = self.get_store("drive_group_map") - self._drive_group_map: Dict[str, DriveGroupSpec] = {} - if stored_drive_group: - for name, dg in json.loads(stored_drive_group).items(): - try: - self._drive_group_map[name] = DriveGroupSpec.from_json(dg) - except ValueError as e: - self.log.error(f'Failed to load drive group {name} ({dg}): {e}') - - def _save_drive_groups(self) -> None: - json_drive_group_map = { - name: dg.to_json() for name, dg in self._drive_group_map.items() - } - self.set_store("drive_group_map", json.dumps(json_drive_group_map)) - - def remove_osds(self, - osd_ids: List[str], - replace: bool = False, - force: bool = False, - zap: bool = False, - no_destroy: bool = False) -> OrchResult[str]: - assert self._rook_cluster is not None - if zap: - raise RuntimeError("Rook does not support zapping devices during OSD removal.") - res = self._rook_cluster.remove_osds(osd_ids, replace, force, self.mon_command) - return OrchResult(res) - def add_host_label(self, host: str, label: str) -> OrchResult[str]: return self.rook_cluster.add_host_label(host, label) - + def remove_host_label(self, host: str, label: str, force: bool = False) -> OrchResult[str]: return self.rook_cluster.remove_host_label(host, label) - """ - @handle_orch_error - def create_osds(self, drive_group): - # type: (DriveGroupSpec) -> str - # Creates OSDs from a drive group specification. - - # $: ceph orch osd create -i - - # The drivegroup file must only contain one spec at a time. - # - - targets = [] # type: List[str] - if drive_group.data_devices and drive_group.data_devices.paths: - targets += [d.path for d in drive_group.data_devices.paths] - if drive_group.data_directories: - targets += drive_group.data_directories - - all_hosts = raise_if_exception(self.get_hosts()) - - matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts) - - assert len(matching_hosts) == 1 - - if not self.rook_cluster.node_exists(matching_hosts[0]): - raise RuntimeError("Node '{0}' is not in the Kubernetes " - "cluster".format(matching_hosts)) - - # Validate whether cluster CRD can accept individual OSD - # creations (i.e. not useAllDevices) - if not self.rook_cluster.can_create_osd(): - raise RuntimeError("Rook cluster configuration does not " - "support OSD creation.") - - return self.rook_cluster.add_osds(drive_group, matching_hosts) - - # TODO: this was the code to update the progress reference: - - @handle_orch_error - def has_osds(matching_hosts: List[str]) -> bool: - - # Find OSD pods on this host - pod_osd_ids = set() - pods = self.k8s.list_namespaced_pod(self._rook_env.namespace, - label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name), - field_selector="spec.nodeName={0}".format( - matching_hosts[0] - )).items - for p in pods: - pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id'])) - - self.log.debug('pod_osd_ids={0}'.format(pod_osd_ids)) - - found = [] - osdmap = self.get("osd_map") - for osd in osdmap['osds']: - osd_id = osd['osd'] - if osd_id not in pod_osd_ids: - continue - - metadata = self.get_metadata('osd', "%s" % osd_id) - if metadata and metadata['devices'] in targets: - found.append(osd_id) - else: - self.log.info("ignoring osd {0} {1}".format( - osd_id, metadata['devices'] if metadata else 'DNE' - )) - - return found is not None - """ @handle_orch_error def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]: diff --git a/src/pybind/mgr/rook/rook_cluster.py b/src/pybind/mgr/rook/rook_cluster.py index 1a0caa263bfbd..16d498a70e30d 100644 --- a/src/pybind/mgr/rook/rook_cluster.py +++ b/src/pybind/mgr/rook/rook_cluster.py @@ -24,9 +24,20 @@ from urllib3.exceptions import ProtocolError from ceph.deployment.inventory import Device from ceph.deployment.drive_group import DriveGroupSpec -from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec, HostPlacementSpec +from ceph.deployment.service_spec import ( + ServiceSpec, + NFSServiceSpec, + RGWSpec, + PlacementSpec, + HostPlacementSpec, + HostPattern, +) from ceph.utils import datetime_now -from ceph.deployment.drive_selection.matchers import SizeMatcher +from ceph.deployment.drive_selection.matchers import ( + AllMatcher, + Matcher, + SizeMatcher, +) from nfs.cluster import create_ganesha_pool from nfs.module import Module from nfs.export import NFSRados @@ -372,324 +383,6 @@ class KubernetesCustomResource(KubernetesResource): "{} doesn't contain a metadata.name. Unable to track changes".format( self.api_func)) -class DefaultCreator(): - def __init__(self, inventory: 'Dict[str, List[Device]]', coreV1_api: 'client.CoreV1Api', storage_class_name: 'str'): - self.coreV1_api = coreV1_api - self.storage_class_name = storage_class_name - self.inventory = inventory - - def device_to_device_set(self, drive_group: DriveGroupSpec, d: Device) -> ccl.StorageClassDeviceSetsItem: - device_set = ccl.StorageClassDeviceSetsItem( - name=d.sys_api['pv_name'], - volumeClaimTemplates= ccl.VolumeClaimTemplatesList(), - count=1, - encrypted=drive_group.encrypted, - portable=False - ) - device_set.volumeClaimTemplates.append( - ccl.VolumeClaimTemplatesItem( - metadata=ccl.Metadata( - name="data" - ), - spec=ccl.Spec( - storageClassName=self.storage_class_name, - volumeMode="Block", - accessModes=ccl.CrdObjectList(["ReadWriteOnce"]), - resources={ - "requests":{ - "storage": 1 - } - }, - volumeName=d.sys_api['pv_name'] - ) - ) - ) - return device_set - - def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]: - device_list = [] - assert drive_group.data_devices is not None - sizematcher: Optional[SizeMatcher] = None - if drive_group.data_devices.size: - sizematcher = SizeMatcher('size', drive_group.data_devices.size) - limit = getattr(drive_group.data_devices, 'limit', None) - count = 0 - all = getattr(drive_group.data_devices, 'all', None) - paths = [device.path for device in drive_group.data_devices.paths] - osd_list = [] - for pod in rook_pods.items: - if ( - hasattr(pod, 'metadata') - and hasattr(pod.metadata, 'labels') - and 'osd' in pod.metadata.labels - and 'ceph.rook.io/DeviceSet' in pod.metadata.labels - ): - osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet']) - for _, node in self.inventory.items(): - for device in node: - if device.sys_api['pv_name'] in osd_list: - count += 1 - for _, node in self.inventory.items(): - for device in node: - if not limit or (count < limit): - if device.available: - if ( - all - or ( - device.sys_api['node'] in matching_hosts - and ((sizematcher != None) or sizematcher.compare(device)) - and ( - not drive_group.data_devices.paths - or (device.path in paths) - ) - ) - ): - device_list.append(device) - count += 1 - - return device_list - - def add_osds(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> Any: - to_create = self.filter_devices(rook_pods, drive_group,matching_hosts) - assert drive_group.data_devices is not None - def _add_osds(current_cluster, new_cluster): - # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster - if not hasattr(new_cluster.spec, 'storage') or not new_cluster.spec.storage: - new_cluster.spec.storage = ccl.Storage() - - if not hasattr(new_cluster.spec.storage, 'storageClassDeviceSets') or not new_cluster.spec.storage.storageClassDeviceSets: - new_cluster.spec.storage.storageClassDeviceSets = ccl.StorageClassDeviceSetsList() - - existing_scds = [ - scds.name for scds in new_cluster.spec.storage.storageClassDeviceSets - ] - for device in to_create: - new_scds = self.device_to_device_set(drive_group, device) - if new_scds.name not in existing_scds: - new_cluster.spec.storage.storageClassDeviceSets.append(new_scds) - return new_cluster - return _add_osds - -class LSOCreator(DefaultCreator): - def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]: - device_list = [] - assert drive_group.data_devices is not None - sizematcher = None - if drive_group.data_devices.size: - sizematcher = SizeMatcher('size', drive_group.data_devices.size) - limit = getattr(drive_group.data_devices, 'limit', None) - all = getattr(drive_group.data_devices, 'all', None) - paths = [device.path for device in drive_group.data_devices.paths] - vendor = getattr(drive_group.data_devices, 'vendor', None) - model = getattr(drive_group.data_devices, 'model', None) - count = 0 - osd_list = [] - for pod in rook_pods.items: - if ( - hasattr(pod, 'metadata') - and hasattr(pod.metadata, 'labels') - and 'osd' in pod.metadata.labels - and 'ceph.rook.io/DeviceSet' in pod.metadata.labels - ): - osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet']) - for _, node in self.inventory.items(): - for device in node: - if device.sys_api['pv_name'] in osd_list: - count += 1 - for _, node in self.inventory.items(): - for device in node: - if not limit or (count < limit): - if device.available: - if ( - all - or ( - device.sys_api['node'] in matching_hosts - and ((sizematcher != None) or sizematcher.compare(device)) - and ( - not drive_group.data_devices.paths - or device.path in paths - ) - and ( - not vendor - or device.sys_api['vendor'] == vendor - ) - and ( - not model - or device.sys_api['model'].startsWith(model) - ) - ) - ): - device_list.append(device) - count += 1 - return device_list - -class DefaultRemover(): - def __init__( - self, - coreV1_api: 'client.CoreV1Api', - batchV1_api: 'client.BatchV1Api', - appsV1_api: 'client.AppsV1Api', - osd_ids: List[str], - replace_flag: bool, - force_flag: bool, - mon_command: Callable, - patch: Callable, - rook_env: 'RookEnv', - inventory: Dict[str, List[Device]] - ): - self.batchV1_api = batchV1_api - self.appsV1_api = appsV1_api - self.coreV1_api = coreV1_api - - self.osd_ids = osd_ids - self.replace_flag = replace_flag - self.force_flag = force_flag - - self.mon_command = mon_command - - self.patch = patch - self.rook_env = rook_env - - self.inventory = inventory - self.osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod, - namespace=self.rook_env.namespace, - label_selector='app=rook-ceph-osd') - self.jobs: KubernetesResource = KubernetesResource(self.batchV1_api.list_namespaced_job, - namespace=self.rook_env.namespace, - label_selector='app=rook-ceph-osd-prepare') - self.pvcs: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_persistent_volume_claim, - namespace=self.rook_env.namespace) - - - def remove_device_sets(self) -> str: - self.to_remove: Dict[str, int] = {} - self.pvc_to_remove: List[str] = [] - for pod in self.osd_pods.items: - if ( - hasattr(pod, 'metadata') - and hasattr(pod.metadata, 'labels') - and 'osd' in pod.metadata.labels - and pod.metadata.labels['osd'] in self.osd_ids - ): - if pod.metadata.labels['ceph.rook.io/DeviceSet'] in self.to_remove: - self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] + 1 - else: - self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = 1 - self.pvc_to_remove.append(pod.metadata.labels['ceph.rook.io/pvc']) - def _remove_osds(current_cluster, new_cluster): - # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster - assert new_cluster.spec.storage is not None and new_cluster.spec.storage.storageClassDeviceSets is not None - for _set in new_cluster.spec.storage.storageClassDeviceSets: - if _set.name in self.to_remove: - if _set.count == self.to_remove[_set.name]: - new_cluster.spec.storage.storageClassDeviceSets.remove(_set) - else: - _set.count = _set.count - self.to_remove[_set.name] - return new_cluster - return self.patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _remove_osds) - - def check_force(self) -> None: - if not self.force_flag: - safe_args = {'prefix': 'osd safe-to-destroy', - 'ids': [str(x) for x in self.osd_ids]} - ret, out, err = self.mon_command(safe_args) - if ret != 0: - raise RuntimeError(err) - - def set_osds_down(self) -> None: - down_flag_args = { - 'prefix': 'osd down', - 'ids': [str(x) for x in self.osd_ids] - } - ret, out, err = self.mon_command(down_flag_args) - if ret != 0: - raise RuntimeError(err) - - def scale_deployments(self) -> None: - for osd_id in self.osd_ids: - self.appsV1_api.patch_namespaced_deployment_scale(namespace=self.rook_env.namespace, - name='rook-ceph-osd-{}'.format(osd_id), - body=client.V1Scale(spec=client.V1ScaleSpec(replicas=0))) - - def set_osds_out(self) -> None: - out_flag_args = { - 'prefix': 'osd out', - 'ids': [str(x) for x in self.osd_ids] - } - ret, out, err = self.mon_command(out_flag_args) - if ret != 0: - raise RuntimeError(err) - - def delete_deployments(self) -> None: - for osd_id in self.osd_ids: - self.appsV1_api.delete_namespaced_deployment(namespace=self.rook_env.namespace, - name='rook-ceph-osd-{}'.format(osd_id), - propagation_policy='Foreground') - - def clean_up_prepare_jobs_and_pvc(self) -> None: - for job in self.jobs.items: - if job.metadata.labels['ceph.rook.io/pvc'] in self.pvc_to_remove: - self.batchV1_api.delete_namespaced_job(name=job.metadata.name, namespace=self.rook_env.namespace, - propagation_policy='Foreground') - self.coreV1_api.delete_namespaced_persistent_volume_claim(name=job.metadata.labels['ceph.rook.io/pvc'], - namespace=self.rook_env.namespace, - propagation_policy='Foreground') - - def purge_osds(self) -> None: - for id in self.osd_ids: - purge_args = { - 'prefix': 'osd purge-actual', - 'id': int(id), - 'yes_i_really_mean_it': True - } - ret, out, err = self.mon_command(purge_args) - if ret != 0: - raise RuntimeError(err) - - def destroy_osds(self) -> None: - for id in self.osd_ids: - destroy_args = { - 'prefix': 'osd destroy-actual', - 'id': int(id), - 'yes_i_really_mean_it': True - } - ret, out, err = self.mon_command(destroy_args) - if ret != 0: - raise RuntimeError(err) - - def remove(self) -> str: - try: - self.check_force() - except Exception as e: - log.exception("Error checking if OSDs are safe to destroy") - return f"OSDs not safe to destroy or unable to check if they are safe to destroy: {e}" - try: - remove_result = self.remove_device_sets() - except Exception as e: - log.exception("Error patching ceph cluster CRD") - return f"Not possible to modify Ceph cluster CRD: {e}" - try: - self.scale_deployments() - self.delete_deployments() - self.clean_up_prepare_jobs_and_pvc() - except Exception as e: - log.exception("Ceph cluster CRD patched, but error cleaning environment") - return f"Error cleaning environment after removing OSDs from Ceph cluster CRD: {e}" - try: - self.set_osds_down() - self.set_osds_out() - if self.replace_flag: - self.destroy_osds() - else: - self.purge_osds() - except Exception as e: - log.exception("OSDs removed from environment, but not able to remove OSDs from Ceph cluster") - return f"Error removing OSDs from Ceph cluster: {e}" - - return remove_result - - - class RookCluster(object): # import of client.CoreV1Api must be optional at import time. # Instead allow mgr/rook to be imported anyway. @@ -1101,7 +794,6 @@ class RookCluster(object): name=spec.rgw_zone ) return object_store - def _update_zone(new: cos.CephObjectStore) -> cos.CephObjectStore: if new.spec.gateway: @@ -1193,48 +885,11 @@ class RookCluster(object): return new return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count) - def add_osds(self, drive_group, matching_hosts): - # type: (DriveGroupSpec, List[str]) -> str - assert drive_group.objectstore in ("bluestore", "filestore") - assert drive_group.service_id - storage_class = self.get_storage_class() - inventory = self.get_discovered_devices() - creator: Optional[DefaultCreator] = None - if ( - storage_class.metadata.labels - and 'local.storage.openshift.io/owner-name' in storage_class.metadata.labels - ): - creator = LSOCreator(inventory, self.coreV1_api, self.storage_class_name) - else: - creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class_name) - return self._patch( - ccl.CephCluster, - 'cephclusters', - self.rook_env.cluster_name, - creator.add_osds(self.rook_pods, drive_group, matching_hosts) - ) - - def remove_osds(self, osd_ids: List[str], replace: bool, force: bool, mon_command: Callable) -> str: - inventory = self.get_discovered_devices() - self.remover = DefaultRemover( - self.coreV1_api, - self.batchV1_api, - self.appsV1_api, - osd_ids, - replace, - force, - mon_command, - self._patch, - self.rook_env, - inventory - ) - return self.remover.remove() - def get_hosts(self) -> List[orchestrator.HostSpec]: ret = [] for node in self.nodes.items: spec = orchestrator.HostSpec( - node.metadata.name, + node.metadata.name, addr='/'.join([addr.address for addr in node.status.addresses]), labels=[label.split('/')[1] for label in node.metadata.labels if label.startswith('ceph-label')], ) @@ -1590,7 +1245,7 @@ def node_selector_to_placement_spec(node_selector: ccl.NodeSelectorTermsItem) -> res.label = expression.key.split('/')[1] elif expression.key == "kubernetes.io/hostname": if expression.operator == "Exists": - res.host_pattern = "*" + res.host_pattern = HostPattern("*") elif expression.operator == "In": res.hosts = [HostPlacementSpec(hostname=value, network='', name='')for value in expression.values] return res