]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/rook: removing all the code related to OSDs creation/removal
authorRedouane Kachach <rkachach@redhat.com>
Thu, 22 Feb 2024 08:48:06 +0000 (09:48 +0100)
committerRedouane Kachach <rkachach@redhat.com>
Thu, 22 Feb 2024 08:48:06 +0000 (09:48 +0100)
Fixes: https://tracker.ceph.com/issues/64211
Signed-off-by: Redouane Kachach <rkachach@redhat.com>
src/pybind/mgr/rook/module.py
src/pybind/mgr/rook/rook_cluster.py

index ca2d168db636e1c08c3dbebf5bb74abd1f8e9084..91099710c4b351dfd19f0153077b9b069728684e 100644 (file)
@@ -82,12 +82,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
             default='local',
             desc='storage class name for LSO-discovered PVs',
         ),
-        Option(
-            'drive_group_interval',
-            type='float',
-            default=300.0,
-            desc='interval in seconds between re-application of applied drive_groups',
-        ),
     ]
 
     @staticmethod
@@ -126,9 +120,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         self.config_notify()
         if TYPE_CHECKING:
             self.storage_class = 'foo'
-            self.drive_group_interval = 10.0
 
-        self._load_drive_groups()
         self._shutdown = threading.Event()
 
     def config_notify(self) -> None:
@@ -144,7 +136,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
             self.log.debug(' mgr option %s = %s',
                            opt['name'], getattr(self, opt['name']))  # type: ignore
         assert isinstance(self.storage_class, str)
-        assert isinstance(self.drive_group_interval, float)
 
         if self._rook_cluster:
             self._rook_cluster.storage_class_name = self.storage_class
@@ -211,10 +202,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         self._initialized.set()
         self.config_notify()
 
-        while not self._shutdown.is_set():
-            self._apply_drivegroups(list(self._drive_group_map.values()))
-            self._shutdown.wait(self.drive_group_interval)
-
     @handle_orch_error
     def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
         host_list = None
@@ -415,15 +402,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
                 running=sum_running_pods('osd')
             )
 
-            # drivegroups
-            for name, dg in self._drive_group_map.items():
-                spec[f'osd.{name}'] = orchestrator.ServiceDescription(
-                    spec=dg,
-                    last_refresh=now,
-                    size=0,
-                    running=0,
-                )
-
         if service_type == 'rbd-mirror' or service_type is None:
             # rbd-mirrors
             all_mirrors = self.rook_cluster.get_resource("cephrbdmirrors")
@@ -576,9 +554,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         elif service_type == 'rbd-mirror':
             return self.rook_cluster.rm_service('cephrbdmirrors', service_id)
         elif service_type == 'osd':
-            if service_id in self._drive_group_map:
-                del self._drive_group_map[service_id]
-                self._save_drive_groups()
             return f'Removed {service_name}'
         elif service_type == 'ingress':
             self.log.info("{0} service '{1}' does not exist".format('ingress', service_id))
@@ -634,134 +609,11 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
     def remove_daemons(self, names: List[str]) -> List[str]:
         return self.rook_cluster.remove_pods(names)
 
-    def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
-        for drive_group in specs:
-            self._drive_group_map[str(drive_group.service_id)] = drive_group
-        self._save_drive_groups()
-        return OrchResult(self._apply_drivegroups(specs))
-
-    def _apply_drivegroups(self, ls: List[DriveGroupSpec]) -> List[str]:
-        all_hosts = raise_if_exception(self.get_hosts())
-        result_list: List[str] = []
-        for drive_group in ls:
-            matching_hosts = drive_group.placement.filter_matching_hosts(
-                lambda label=None, as_hostspec=None: all_hosts
-            )
-
-            if not self.rook_cluster.node_exists(matching_hosts[0]):
-                raise RuntimeError("Node '{0}' is not in the Kubernetes "
-                               "cluster".format(matching_hosts))
-
-            # Validate whether cluster CRD can accept individual OSD
-            # creations (i.e. not useAllDevices)
-            if not self.rook_cluster.can_create_osd():
-                raise RuntimeError("Rook cluster configuration does not "
-                                "support OSD creation.")
-            result_list.append(self.rook_cluster.add_osds(drive_group, matching_hosts))
-        return result_list
-
-    def _load_drive_groups(self) -> None:
-        stored_drive_group = self.get_store("drive_group_map")
-        self._drive_group_map: Dict[str, DriveGroupSpec] = {}
-        if stored_drive_group:
-            for name, dg in json.loads(stored_drive_group).items():
-                try:
-                    self._drive_group_map[name] = DriveGroupSpec.from_json(dg)
-                except ValueError as e:
-                    self.log.error(f'Failed to load drive group {name} ({dg}): {e}')
-
-    def _save_drive_groups(self) -> None:
-        json_drive_group_map = {
-            name: dg.to_json() for name, dg in self._drive_group_map.items()
-        }
-        self.set_store("drive_group_map", json.dumps(json_drive_group_map))
-
-    def remove_osds(self,
-                    osd_ids: List[str],
-                    replace: bool = False,
-                    force: bool = False,
-                    zap: bool = False,
-                    no_destroy: bool = False) -> OrchResult[str]:
-        assert self._rook_cluster is not None
-        if zap:
-            raise RuntimeError("Rook does not support zapping devices during OSD removal.")
-        res = self._rook_cluster.remove_osds(osd_ids, replace, force, self.mon_command)
-        return OrchResult(res)
-
     def add_host_label(self, host: str, label: str) -> OrchResult[str]:
         return self.rook_cluster.add_host_label(host, label)
-    
+
     def remove_host_label(self, host: str, label: str, force: bool = False) -> OrchResult[str]:
         return self.rook_cluster.remove_host_label(host, label)
-    """
-    @handle_orch_error
-    def create_osds(self, drive_group):
-        # type: (DriveGroupSpec) -> str
-        # Creates OSDs from a drive group specification.
-
-        # $: ceph orch osd create -i <dg.file>
-
-        # The drivegroup file must only contain one spec at a time.
-        # 
-
-        targets = []  # type: List[str]
-        if drive_group.data_devices and drive_group.data_devices.paths:
-            targets += [d.path for d in drive_group.data_devices.paths]
-        if drive_group.data_directories:
-            targets += drive_group.data_directories
-
-        all_hosts = raise_if_exception(self.get_hosts())
-
-        matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts)
-
-        assert len(matching_hosts) == 1
-
-        if not self.rook_cluster.node_exists(matching_hosts[0]):
-            raise RuntimeError("Node '{0}' is not in the Kubernetes "
-                               "cluster".format(matching_hosts))
-
-        # Validate whether cluster CRD can accept individual OSD
-        # creations (i.e. not useAllDevices)
-        if not self.rook_cluster.can_create_osd():
-            raise RuntimeError("Rook cluster configuration does not "
-                               "support OSD creation.")
-
-        return self.rook_cluster.add_osds(drive_group, matching_hosts)
-
-        # TODO: this was the code to update the progress reference:
-        
-        @handle_orch_error
-        def has_osds(matching_hosts: List[str]) -> bool:
-
-            # Find OSD pods on this host
-            pod_osd_ids = set()
-            pods = self.k8s.list_namespaced_pod(self._rook_env.namespace,
-                                                label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
-                                                field_selector="spec.nodeName={0}".format(
-                                                    matching_hosts[0]
-                                                )).items
-            for p in pods:
-                pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
-
-            self.log.debug('pod_osd_ids={0}'.format(pod_osd_ids))
-
-            found = []
-            osdmap = self.get("osd_map")
-            for osd in osdmap['osds']:
-                osd_id = osd['osd']
-                if osd_id not in pod_osd_ids:
-                    continue
-
-                metadata = self.get_metadata('osd', "%s" % osd_id)
-                if metadata and metadata['devices'] in targets:
-                    found.append(osd_id)
-                else:
-                    self.log.info("ignoring osd {0} {1}".format(
-                        osd_id, metadata['devices'] if metadata else 'DNE'
-                    ))
-
-            return found is not None        
-    """
 
     @handle_orch_error
     def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]:
index 1a0caa263bfbde4bcdf7fd20b90f7eac04507497..16d498a70e30da68d6f6c95ab377fa92732e528f 100644 (file)
@@ -24,9 +24,20 @@ from urllib3.exceptions import ProtocolError
 
 from ceph.deployment.inventory import Device
 from ceph.deployment.drive_group import DriveGroupSpec
-from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec, HostPlacementSpec
+from ceph.deployment.service_spec import (
+    ServiceSpec,
+    NFSServiceSpec,
+    RGWSpec,
+    PlacementSpec,
+    HostPlacementSpec,
+    HostPattern,
+)
 from ceph.utils import datetime_now
-from ceph.deployment.drive_selection.matchers import SizeMatcher
+from ceph.deployment.drive_selection.matchers import (
+    AllMatcher,
+    Matcher,
+    SizeMatcher,
+)
 from nfs.cluster import create_ganesha_pool
 from nfs.module import Module
 from nfs.export import NFSRados
@@ -372,324 +383,6 @@ class KubernetesCustomResource(KubernetesResource):
                         "{} doesn't contain a metadata.name. Unable to track changes".format(
                             self.api_func))
 
-class DefaultCreator():
-    def __init__(self, inventory: 'Dict[str, List[Device]]', coreV1_api: 'client.CoreV1Api', storage_class_name: 'str'):
-        self.coreV1_api = coreV1_api
-        self.storage_class_name = storage_class_name
-        self.inventory = inventory
-
-    def device_to_device_set(self, drive_group: DriveGroupSpec, d: Device) -> ccl.StorageClassDeviceSetsItem:
-        device_set = ccl.StorageClassDeviceSetsItem(
-                    name=d.sys_api['pv_name'],
-                    volumeClaimTemplates= ccl.VolumeClaimTemplatesList(),
-                    count=1,
-                    encrypted=drive_group.encrypted,
-                    portable=False
-                )
-        device_set.volumeClaimTemplates.append(
-            ccl.VolumeClaimTemplatesItem(
-                metadata=ccl.Metadata(
-                    name="data"
-                ),
-                spec=ccl.Spec(
-                    storageClassName=self.storage_class_name,
-                    volumeMode="Block",
-                    accessModes=ccl.CrdObjectList(["ReadWriteOnce"]),
-                    resources={
-                        "requests":{
-                                "storage": 1
-                        }
-                    },
-                    volumeName=d.sys_api['pv_name']
-                )
-            )
-        )
-        return device_set
-
-    def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]:
-        device_list = []
-        assert drive_group.data_devices is not None
-        sizematcher: Optional[SizeMatcher] = None
-        if drive_group.data_devices.size:
-            sizematcher = SizeMatcher('size', drive_group.data_devices.size)
-        limit = getattr(drive_group.data_devices, 'limit', None)
-        count = 0
-        all = getattr(drive_group.data_devices, 'all', None)
-        paths = [device.path for device in drive_group.data_devices.paths]
-        osd_list = []
-        for pod in rook_pods.items:
-            if (
-                hasattr(pod, 'metadata') 
-                and hasattr(pod.metadata, 'labels') 
-                and 'osd' in pod.metadata.labels 
-                and 'ceph.rook.io/DeviceSet' in pod.metadata.labels
-            ):
-                osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet'])
-        for _, node in self.inventory.items():
-            for device in node:
-                if device.sys_api['pv_name'] in osd_list:
-                    count += 1
-        for _, node in self.inventory.items():
-            for device in node:
-                if not limit or (count < limit):
-                    if device.available:
-                        if (
-                            all 
-                            or (
-                                device.sys_api['node'] in matching_hosts
-                                and ((sizematcher != None) or sizematcher.compare(device))
-                                and (
-                                    not drive_group.data_devices.paths
-                                    or (device.path in paths)
-                                )
-                            )
-                        ):
-                            device_list.append(device)
-                            count += 1
-        
-        return device_list
-
-    def add_osds(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> Any:
-        to_create = self.filter_devices(rook_pods, drive_group,matching_hosts)
-        assert drive_group.data_devices is not None
-        def _add_osds(current_cluster, new_cluster):
-            # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
-            if not hasattr(new_cluster.spec, 'storage') or not new_cluster.spec.storage:
-                new_cluster.spec.storage = ccl.Storage()
-
-            if not hasattr(new_cluster.spec.storage, 'storageClassDeviceSets') or not new_cluster.spec.storage.storageClassDeviceSets:
-                new_cluster.spec.storage.storageClassDeviceSets = ccl.StorageClassDeviceSetsList()
-
-            existing_scds = [
-                scds.name for scds in new_cluster.spec.storage.storageClassDeviceSets
-            ]
-            for device in to_create:
-                new_scds = self.device_to_device_set(drive_group, device)
-                if new_scds.name not in existing_scds:
-                    new_cluster.spec.storage.storageClassDeviceSets.append(new_scds)
-            return new_cluster
-        return _add_osds
-
-class LSOCreator(DefaultCreator):
-    def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]:
-        device_list = []
-        assert drive_group.data_devices is not None
-        sizematcher = None
-        if drive_group.data_devices.size:
-            sizematcher = SizeMatcher('size', drive_group.data_devices.size)
-        limit = getattr(drive_group.data_devices, 'limit', None)
-        all = getattr(drive_group.data_devices, 'all', None)
-        paths = [device.path for device in drive_group.data_devices.paths]
-        vendor = getattr(drive_group.data_devices, 'vendor', None)
-        model = getattr(drive_group.data_devices, 'model', None)
-        count = 0
-        osd_list = []
-        for pod in rook_pods.items:
-            if (
-                hasattr(pod, 'metadata') 
-                and hasattr(pod.metadata, 'labels') 
-                and 'osd' in pod.metadata.labels 
-                and 'ceph.rook.io/DeviceSet' in pod.metadata.labels
-            ):
-                osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet'])
-        for _, node in self.inventory.items():
-            for device in node:
-                if device.sys_api['pv_name'] in osd_list:
-                    count += 1
-        for _, node in self.inventory.items():
-            for device in node:
-                if not limit or (count < limit):
-                    if device.available:
-                        if (
-                            all 
-                            or (
-                                device.sys_api['node'] in matching_hosts
-                                and ((sizematcher != None) or sizematcher.compare(device))
-                                and (
-                                    not drive_group.data_devices.paths
-                                    or device.path in paths
-                                ) 
-                                and (
-                                    not vendor 
-                                    or device.sys_api['vendor'] == vendor
-                                )
-                                and (
-                                    not model 
-                                    or device.sys_api['model'].startsWith(model)
-                                )
-                            )
-                        ):
-                            device_list.append(device)
-                            count += 1
-        return device_list
-
-class DefaultRemover():
-    def __init__(
-        self,
-        coreV1_api: 'client.CoreV1Api', 
-        batchV1_api: 'client.BatchV1Api', 
-        appsV1_api: 'client.AppsV1Api', 
-        osd_ids: List[str], 
-        replace_flag: bool, 
-        force_flag: bool, 
-        mon_command: Callable, 
-        patch: Callable, 
-        rook_env: 'RookEnv',
-        inventory: Dict[str, List[Device]]
-    ):
-        self.batchV1_api = batchV1_api
-        self.appsV1_api = appsV1_api
-        self.coreV1_api = coreV1_api
-
-        self.osd_ids = osd_ids
-        self.replace_flag = replace_flag
-        self.force_flag = force_flag
-
-        self.mon_command = mon_command
-
-        self.patch = patch
-        self.rook_env = rook_env
-
-        self.inventory = inventory
-        self.osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod,
-                                                               namespace=self.rook_env.namespace,
-                                                               label_selector='app=rook-ceph-osd')
-        self.jobs: KubernetesResource = KubernetesResource(self.batchV1_api.list_namespaced_job,
-                                                           namespace=self.rook_env.namespace,
-                                                           label_selector='app=rook-ceph-osd-prepare')
-        self.pvcs: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_persistent_volume_claim,
-                                                           namespace=self.rook_env.namespace)
-
-
-    def remove_device_sets(self) -> str:
-        self.to_remove: Dict[str, int] = {}
-        self.pvc_to_remove: List[str] = []
-        for pod in self.osd_pods.items:
-            if (
-                hasattr(pod, 'metadata') 
-                and hasattr(pod.metadata, 'labels') 
-                and 'osd' in pod.metadata.labels 
-                and pod.metadata.labels['osd'] in self.osd_ids
-            ):
-                if pod.metadata.labels['ceph.rook.io/DeviceSet'] in self.to_remove:
-                    self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] + 1
-                else:
-                    self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = 1
-                self.pvc_to_remove.append(pod.metadata.labels['ceph.rook.io/pvc'])
-        def _remove_osds(current_cluster, new_cluster):
-            # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
-            assert new_cluster.spec.storage is not None and new_cluster.spec.storage.storageClassDeviceSets is not None
-            for _set in new_cluster.spec.storage.storageClassDeviceSets:
-                    if _set.name in self.to_remove:
-                        if _set.count == self.to_remove[_set.name]:
-                            new_cluster.spec.storage.storageClassDeviceSets.remove(_set)
-                        else:
-                            _set.count = _set.count - self.to_remove[_set.name]
-            return new_cluster
-        return self.patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _remove_osds)
-
-    def check_force(self) -> None:
-        if not self.force_flag:
-            safe_args = {'prefix': 'osd safe-to-destroy',
-                        'ids': [str(x) for x in self.osd_ids]}
-            ret, out, err = self.mon_command(safe_args)
-            if ret != 0:
-                raise RuntimeError(err)
-
-    def set_osds_down(self) -> None:
-        down_flag_args = {
-            'prefix': 'osd down',
-            'ids': [str(x) for x in self.osd_ids]
-        }
-        ret, out, err = self.mon_command(down_flag_args)
-        if ret != 0:
-            raise RuntimeError(err)
-
-    def scale_deployments(self) -> None:
-        for osd_id in self.osd_ids:
-            self.appsV1_api.patch_namespaced_deployment_scale(namespace=self.rook_env.namespace,
-                                                              name='rook-ceph-osd-{}'.format(osd_id),
-                                                              body=client.V1Scale(spec=client.V1ScaleSpec(replicas=0)))
-
-    def set_osds_out(self) -> None:
-        out_flag_args = {
-            'prefix': 'osd out',
-            'ids': [str(x) for x in self.osd_ids]
-        }
-        ret, out, err = self.mon_command(out_flag_args)
-        if ret != 0:
-            raise RuntimeError(err)
-            
-    def delete_deployments(self) -> None:
-        for osd_id in self.osd_ids:
-            self.appsV1_api.delete_namespaced_deployment(namespace=self.rook_env.namespace,
-                                                         name='rook-ceph-osd-{}'.format(osd_id),
-                                                         propagation_policy='Foreground')
-
-    def clean_up_prepare_jobs_and_pvc(self) -> None:
-        for job in self.jobs.items:
-            if job.metadata.labels['ceph.rook.io/pvc'] in self.pvc_to_remove:
-                self.batchV1_api.delete_namespaced_job(name=job.metadata.name, namespace=self.rook_env.namespace,
-                                                       propagation_policy='Foreground')
-                self.coreV1_api.delete_namespaced_persistent_volume_claim(name=job.metadata.labels['ceph.rook.io/pvc'],
-                                                                          namespace=self.rook_env.namespace,
-                                                                          propagation_policy='Foreground')
-
-    def purge_osds(self) -> None:
-        for id in self.osd_ids:
-            purge_args = {
-                'prefix': 'osd purge-actual',
-                'id': int(id),
-                'yes_i_really_mean_it': True
-            }
-            ret, out, err = self.mon_command(purge_args)
-            if ret != 0:
-                raise RuntimeError(err)
-
-    def destroy_osds(self) -> None:
-        for id in self.osd_ids:
-            destroy_args = {
-                'prefix': 'osd destroy-actual',
-                'id': int(id),
-                'yes_i_really_mean_it': True
-            }
-            ret, out, err = self.mon_command(destroy_args)
-            if ret != 0:
-                raise RuntimeError(err)
-
-    def remove(self) -> str:
-        try:
-            self.check_force()
-        except Exception as e:
-            log.exception("Error checking if OSDs are safe to destroy")
-            return f"OSDs not safe to destroy or unable to check if they are safe to destroy: {e}"
-        try:
-            remove_result = self.remove_device_sets()
-        except Exception as e:
-            log.exception("Error patching ceph cluster CRD")
-            return f"Not possible to modify Ceph cluster CRD: {e}"
-        try:
-            self.scale_deployments()
-            self.delete_deployments()
-            self.clean_up_prepare_jobs_and_pvc()
-        except Exception as e:
-            log.exception("Ceph cluster CRD patched, but error cleaning environment")
-            return f"Error cleaning environment after removing OSDs from Ceph cluster CRD: {e}"
-        try:
-            self.set_osds_down()
-            self.set_osds_out()
-            if self.replace_flag:
-                self.destroy_osds()
-            else:
-                self.purge_osds()
-        except Exception as e:
-            log.exception("OSDs removed from environment, but not able to remove OSDs from Ceph cluster")
-            return f"Error removing OSDs from Ceph cluster: {e}"
-
-        return remove_result
-
-
-
 class RookCluster(object):
     # import of client.CoreV1Api must be optional at import time.
     # Instead allow mgr/rook to be imported anyway.
@@ -1101,7 +794,6 @@ class RookCluster(object):
                             name=spec.rgw_zone
                         )
             return object_store
-                
 
         def _update_zone(new: cos.CephObjectStore) -> cos.CephObjectStore:
             if new.spec.gateway:
@@ -1193,48 +885,11 @@ class RookCluster(object):
             return new
         return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count)
 
-    def add_osds(self, drive_group, matching_hosts):
-        # type: (DriveGroupSpec, List[str]) -> str
-        assert drive_group.objectstore in ("bluestore", "filestore")
-        assert drive_group.service_id
-        storage_class = self.get_storage_class()
-        inventory = self.get_discovered_devices()
-        creator: Optional[DefaultCreator] = None
-        if (
-            storage_class.metadata.labels
-            and 'local.storage.openshift.io/owner-name' in storage_class.metadata.labels
-        ):
-            creator = LSOCreator(inventory, self.coreV1_api, self.storage_class_name)
-        else:
-            creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class_name)
-        return self._patch(
-            ccl.CephCluster,
-            'cephclusters',
-            self.rook_env.cluster_name,
-            creator.add_osds(self.rook_pods, drive_group, matching_hosts)
-        )
-
-    def remove_osds(self, osd_ids: List[str], replace: bool, force: bool, mon_command: Callable) -> str:
-        inventory = self.get_discovered_devices()
-        self.remover = DefaultRemover(
-            self.coreV1_api,
-            self.batchV1_api, 
-            self.appsV1_api, 
-            osd_ids, 
-            replace, 
-            force, 
-            mon_command, 
-            self._patch, 
-            self.rook_env,
-            inventory
-        )
-        return self.remover.remove()
-
     def get_hosts(self) -> List[orchestrator.HostSpec]:
         ret = []
         for node in self.nodes.items:
             spec = orchestrator.HostSpec(
-                node.metadata.name, 
+                node.metadata.name,
                 addr='/'.join([addr.address for addr in node.status.addresses]), 
                 labels=[label.split('/')[1] for label in node.metadata.labels if label.startswith('ceph-label')],
             )
@@ -1590,7 +1245,7 @@ def node_selector_to_placement_spec(node_selector: ccl.NodeSelectorTermsItem) ->
             res.label = expression.key.split('/')[1]
         elif expression.key == "kubernetes.io/hostname":
             if expression.operator == "Exists":
-                res.host_pattern = "*"
+                res.host_pattern = HostPattern("*")
             elif expression.operator == "In": 
                 res.hosts = [HostPlacementSpec(hostname=value, network='', name='')for value in expression.values]
     return res