default='local',
desc='storage class name for LSO-discovered PVs',
),
- Option(
- 'drive_group_interval',
- type='float',
- default=300.0,
- desc='interval in seconds between re-application of applied drive_groups',
- ),
]
@staticmethod
self.config_notify()
if TYPE_CHECKING:
self.storage_class = 'foo'
- self.drive_group_interval = 10.0
- self._load_drive_groups()
self._shutdown = threading.Event()
def config_notify(self) -> None:
self.log.debug(' mgr option %s = %s',
opt['name'], getattr(self, opt['name'])) # type: ignore
assert isinstance(self.storage_class, str)
- assert isinstance(self.drive_group_interval, float)
if self._rook_cluster:
self._rook_cluster.storage_class_name = self.storage_class
self._initialized.set()
self.config_notify()
- while not self._shutdown.is_set():
- self._apply_drivegroups(list(self._drive_group_map.values()))
- self._shutdown.wait(self.drive_group_interval)
-
@handle_orch_error
def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
host_list = None
running=sum_running_pods('osd')
)
- # drivegroups
- for name, dg in self._drive_group_map.items():
- spec[f'osd.{name}'] = orchestrator.ServiceDescription(
- spec=dg,
- last_refresh=now,
- size=0,
- running=0,
- )
-
if service_type == 'rbd-mirror' or service_type is None:
# rbd-mirrors
all_mirrors = self.rook_cluster.get_resource("cephrbdmirrors")
elif service_type == 'rbd-mirror':
return self.rook_cluster.rm_service('cephrbdmirrors', service_id)
elif service_type == 'osd':
- if service_id in self._drive_group_map:
- del self._drive_group_map[service_id]
- self._save_drive_groups()
return f'Removed {service_name}'
elif service_type == 'ingress':
self.log.info("{0} service '{1}' does not exist".format('ingress', service_id))
def remove_daemons(self, names: List[str]) -> List[str]:
return self.rook_cluster.remove_pods(names)
- def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
- for drive_group in specs:
- self._drive_group_map[str(drive_group.service_id)] = drive_group
- self._save_drive_groups()
- return OrchResult(self._apply_drivegroups(specs))
-
- def _apply_drivegroups(self, ls: List[DriveGroupSpec]) -> List[str]:
- all_hosts = raise_if_exception(self.get_hosts())
- result_list: List[str] = []
- for drive_group in ls:
- matching_hosts = drive_group.placement.filter_matching_hosts(
- lambda label=None, as_hostspec=None: all_hosts
- )
-
- if not self.rook_cluster.node_exists(matching_hosts[0]):
- raise RuntimeError("Node '{0}' is not in the Kubernetes "
- "cluster".format(matching_hosts))
-
- # Validate whether cluster CRD can accept individual OSD
- # creations (i.e. not useAllDevices)
- if not self.rook_cluster.can_create_osd():
- raise RuntimeError("Rook cluster configuration does not "
- "support OSD creation.")
- result_list.append(self.rook_cluster.add_osds(drive_group, matching_hosts))
- return result_list
-
- def _load_drive_groups(self) -> None:
- stored_drive_group = self.get_store("drive_group_map")
- self._drive_group_map: Dict[str, DriveGroupSpec] = {}
- if stored_drive_group:
- for name, dg in json.loads(stored_drive_group).items():
- try:
- self._drive_group_map[name] = DriveGroupSpec.from_json(dg)
- except ValueError as e:
- self.log.error(f'Failed to load drive group {name} ({dg}): {e}')
-
- def _save_drive_groups(self) -> None:
- json_drive_group_map = {
- name: dg.to_json() for name, dg in self._drive_group_map.items()
- }
- self.set_store("drive_group_map", json.dumps(json_drive_group_map))
-
- def remove_osds(self,
- osd_ids: List[str],
- replace: bool = False,
- force: bool = False,
- zap: bool = False,
- no_destroy: bool = False) -> OrchResult[str]:
- assert self._rook_cluster is not None
- if zap:
- raise RuntimeError("Rook does not support zapping devices during OSD removal.")
- res = self._rook_cluster.remove_osds(osd_ids, replace, force, self.mon_command)
- return OrchResult(res)
-
def add_host_label(self, host: str, label: str) -> OrchResult[str]:
return self.rook_cluster.add_host_label(host, label)
-
+
def remove_host_label(self, host: str, label: str, force: bool = False) -> OrchResult[str]:
return self.rook_cluster.remove_host_label(host, label)
- """
- @handle_orch_error
- def create_osds(self, drive_group):
- # type: (DriveGroupSpec) -> str
- # Creates OSDs from a drive group specification.
-
- # $: ceph orch osd create -i <dg.file>
-
- # The drivegroup file must only contain one spec at a time.
- #
-
- targets = [] # type: List[str]
- if drive_group.data_devices and drive_group.data_devices.paths:
- targets += [d.path for d in drive_group.data_devices.paths]
- if drive_group.data_directories:
- targets += drive_group.data_directories
-
- all_hosts = raise_if_exception(self.get_hosts())
-
- matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts)
-
- assert len(matching_hosts) == 1
-
- if not self.rook_cluster.node_exists(matching_hosts[0]):
- raise RuntimeError("Node '{0}' is not in the Kubernetes "
- "cluster".format(matching_hosts))
-
- # Validate whether cluster CRD can accept individual OSD
- # creations (i.e. not useAllDevices)
- if not self.rook_cluster.can_create_osd():
- raise RuntimeError("Rook cluster configuration does not "
- "support OSD creation.")
-
- return self.rook_cluster.add_osds(drive_group, matching_hosts)
-
- # TODO: this was the code to update the progress reference:
-
- @handle_orch_error
- def has_osds(matching_hosts: List[str]) -> bool:
-
- # Find OSD pods on this host
- pod_osd_ids = set()
- pods = self.k8s.list_namespaced_pod(self._rook_env.namespace,
- label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
- field_selector="spec.nodeName={0}".format(
- matching_hosts[0]
- )).items
- for p in pods:
- pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
-
- self.log.debug('pod_osd_ids={0}'.format(pod_osd_ids))
-
- found = []
- osdmap = self.get("osd_map")
- for osd in osdmap['osds']:
- osd_id = osd['osd']
- if osd_id not in pod_osd_ids:
- continue
-
- metadata = self.get_metadata('osd', "%s" % osd_id)
- if metadata and metadata['devices'] in targets:
- found.append(osd_id)
- else:
- self.log.info("ignoring osd {0} {1}".format(
- osd_id, metadata['devices'] if metadata else 'DNE'
- ))
-
- return found is not None
- """
@handle_orch_error
def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]:
from ceph.deployment.inventory import Device
from ceph.deployment.drive_group import DriveGroupSpec
-from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec, HostPlacementSpec
+from ceph.deployment.service_spec import (
+ ServiceSpec,
+ NFSServiceSpec,
+ RGWSpec,
+ PlacementSpec,
+ HostPlacementSpec,
+ HostPattern,
+)
from ceph.utils import datetime_now
-from ceph.deployment.drive_selection.matchers import SizeMatcher
+from ceph.deployment.drive_selection.matchers import (
+ AllMatcher,
+ Matcher,
+ SizeMatcher,
+)
from nfs.cluster import create_ganesha_pool
from nfs.module import Module
from nfs.export import NFSRados
"{} doesn't contain a metadata.name. Unable to track changes".format(
self.api_func))
-class DefaultCreator():
- def __init__(self, inventory: 'Dict[str, List[Device]]', coreV1_api: 'client.CoreV1Api', storage_class_name: 'str'):
- self.coreV1_api = coreV1_api
- self.storage_class_name = storage_class_name
- self.inventory = inventory
-
- def device_to_device_set(self, drive_group: DriveGroupSpec, d: Device) -> ccl.StorageClassDeviceSetsItem:
- device_set = ccl.StorageClassDeviceSetsItem(
- name=d.sys_api['pv_name'],
- volumeClaimTemplates= ccl.VolumeClaimTemplatesList(),
- count=1,
- encrypted=drive_group.encrypted,
- portable=False
- )
- device_set.volumeClaimTemplates.append(
- ccl.VolumeClaimTemplatesItem(
- metadata=ccl.Metadata(
- name="data"
- ),
- spec=ccl.Spec(
- storageClassName=self.storage_class_name,
- volumeMode="Block",
- accessModes=ccl.CrdObjectList(["ReadWriteOnce"]),
- resources={
- "requests":{
- "storage": 1
- }
- },
- volumeName=d.sys_api['pv_name']
- )
- )
- )
- return device_set
-
- def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]:
- device_list = []
- assert drive_group.data_devices is not None
- sizematcher: Optional[SizeMatcher] = None
- if drive_group.data_devices.size:
- sizematcher = SizeMatcher('size', drive_group.data_devices.size)
- limit = getattr(drive_group.data_devices, 'limit', None)
- count = 0
- all = getattr(drive_group.data_devices, 'all', None)
- paths = [device.path for device in drive_group.data_devices.paths]
- osd_list = []
- for pod in rook_pods.items:
- if (
- hasattr(pod, 'metadata')
- and hasattr(pod.metadata, 'labels')
- and 'osd' in pod.metadata.labels
- and 'ceph.rook.io/DeviceSet' in pod.metadata.labels
- ):
- osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet'])
- for _, node in self.inventory.items():
- for device in node:
- if device.sys_api['pv_name'] in osd_list:
- count += 1
- for _, node in self.inventory.items():
- for device in node:
- if not limit or (count < limit):
- if device.available:
- if (
- all
- or (
- device.sys_api['node'] in matching_hosts
- and ((sizematcher != None) or sizematcher.compare(device))
- and (
- not drive_group.data_devices.paths
- or (device.path in paths)
- )
- )
- ):
- device_list.append(device)
- count += 1
-
- return device_list
-
- def add_osds(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> Any:
- to_create = self.filter_devices(rook_pods, drive_group,matching_hosts)
- assert drive_group.data_devices is not None
- def _add_osds(current_cluster, new_cluster):
- # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
- if not hasattr(new_cluster.spec, 'storage') or not new_cluster.spec.storage:
- new_cluster.spec.storage = ccl.Storage()
-
- if not hasattr(new_cluster.spec.storage, 'storageClassDeviceSets') or not new_cluster.spec.storage.storageClassDeviceSets:
- new_cluster.spec.storage.storageClassDeviceSets = ccl.StorageClassDeviceSetsList()
-
- existing_scds = [
- scds.name for scds in new_cluster.spec.storage.storageClassDeviceSets
- ]
- for device in to_create:
- new_scds = self.device_to_device_set(drive_group, device)
- if new_scds.name not in existing_scds:
- new_cluster.spec.storage.storageClassDeviceSets.append(new_scds)
- return new_cluster
- return _add_osds
-
-class LSOCreator(DefaultCreator):
- def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]:
- device_list = []
- assert drive_group.data_devices is not None
- sizematcher = None
- if drive_group.data_devices.size:
- sizematcher = SizeMatcher('size', drive_group.data_devices.size)
- limit = getattr(drive_group.data_devices, 'limit', None)
- all = getattr(drive_group.data_devices, 'all', None)
- paths = [device.path for device in drive_group.data_devices.paths]
- vendor = getattr(drive_group.data_devices, 'vendor', None)
- model = getattr(drive_group.data_devices, 'model', None)
- count = 0
- osd_list = []
- for pod in rook_pods.items:
- if (
- hasattr(pod, 'metadata')
- and hasattr(pod.metadata, 'labels')
- and 'osd' in pod.metadata.labels
- and 'ceph.rook.io/DeviceSet' in pod.metadata.labels
- ):
- osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet'])
- for _, node in self.inventory.items():
- for device in node:
- if device.sys_api['pv_name'] in osd_list:
- count += 1
- for _, node in self.inventory.items():
- for device in node:
- if not limit or (count < limit):
- if device.available:
- if (
- all
- or (
- device.sys_api['node'] in matching_hosts
- and ((sizematcher != None) or sizematcher.compare(device))
- and (
- not drive_group.data_devices.paths
- or device.path in paths
- )
- and (
- not vendor
- or device.sys_api['vendor'] == vendor
- )
- and (
- not model
- or device.sys_api['model'].startsWith(model)
- )
- )
- ):
- device_list.append(device)
- count += 1
- return device_list
-
-class DefaultRemover():
- def __init__(
- self,
- coreV1_api: 'client.CoreV1Api',
- batchV1_api: 'client.BatchV1Api',
- appsV1_api: 'client.AppsV1Api',
- osd_ids: List[str],
- replace_flag: bool,
- force_flag: bool,
- mon_command: Callable,
- patch: Callable,
- rook_env: 'RookEnv',
- inventory: Dict[str, List[Device]]
- ):
- self.batchV1_api = batchV1_api
- self.appsV1_api = appsV1_api
- self.coreV1_api = coreV1_api
-
- self.osd_ids = osd_ids
- self.replace_flag = replace_flag
- self.force_flag = force_flag
-
- self.mon_command = mon_command
-
- self.patch = patch
- self.rook_env = rook_env
-
- self.inventory = inventory
- self.osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod,
- namespace=self.rook_env.namespace,
- label_selector='app=rook-ceph-osd')
- self.jobs: KubernetesResource = KubernetesResource(self.batchV1_api.list_namespaced_job,
- namespace=self.rook_env.namespace,
- label_selector='app=rook-ceph-osd-prepare')
- self.pvcs: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_persistent_volume_claim,
- namespace=self.rook_env.namespace)
-
-
- def remove_device_sets(self) -> str:
- self.to_remove: Dict[str, int] = {}
- self.pvc_to_remove: List[str] = []
- for pod in self.osd_pods.items:
- if (
- hasattr(pod, 'metadata')
- and hasattr(pod.metadata, 'labels')
- and 'osd' in pod.metadata.labels
- and pod.metadata.labels['osd'] in self.osd_ids
- ):
- if pod.metadata.labels['ceph.rook.io/DeviceSet'] in self.to_remove:
- self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] + 1
- else:
- self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = 1
- self.pvc_to_remove.append(pod.metadata.labels['ceph.rook.io/pvc'])
- def _remove_osds(current_cluster, new_cluster):
- # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
- assert new_cluster.spec.storage is not None and new_cluster.spec.storage.storageClassDeviceSets is not None
- for _set in new_cluster.spec.storage.storageClassDeviceSets:
- if _set.name in self.to_remove:
- if _set.count == self.to_remove[_set.name]:
- new_cluster.spec.storage.storageClassDeviceSets.remove(_set)
- else:
- _set.count = _set.count - self.to_remove[_set.name]
- return new_cluster
- return self.patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _remove_osds)
-
- def check_force(self) -> None:
- if not self.force_flag:
- safe_args = {'prefix': 'osd safe-to-destroy',
- 'ids': [str(x) for x in self.osd_ids]}
- ret, out, err = self.mon_command(safe_args)
- if ret != 0:
- raise RuntimeError(err)
-
- def set_osds_down(self) -> None:
- down_flag_args = {
- 'prefix': 'osd down',
- 'ids': [str(x) for x in self.osd_ids]
- }
- ret, out, err = self.mon_command(down_flag_args)
- if ret != 0:
- raise RuntimeError(err)
-
- def scale_deployments(self) -> None:
- for osd_id in self.osd_ids:
- self.appsV1_api.patch_namespaced_deployment_scale(namespace=self.rook_env.namespace,
- name='rook-ceph-osd-{}'.format(osd_id),
- body=client.V1Scale(spec=client.V1ScaleSpec(replicas=0)))
-
- def set_osds_out(self) -> None:
- out_flag_args = {
- 'prefix': 'osd out',
- 'ids': [str(x) for x in self.osd_ids]
- }
- ret, out, err = self.mon_command(out_flag_args)
- if ret != 0:
- raise RuntimeError(err)
-
- def delete_deployments(self) -> None:
- for osd_id in self.osd_ids:
- self.appsV1_api.delete_namespaced_deployment(namespace=self.rook_env.namespace,
- name='rook-ceph-osd-{}'.format(osd_id),
- propagation_policy='Foreground')
-
- def clean_up_prepare_jobs_and_pvc(self) -> None:
- for job in self.jobs.items:
- if job.metadata.labels['ceph.rook.io/pvc'] in self.pvc_to_remove:
- self.batchV1_api.delete_namespaced_job(name=job.metadata.name, namespace=self.rook_env.namespace,
- propagation_policy='Foreground')
- self.coreV1_api.delete_namespaced_persistent_volume_claim(name=job.metadata.labels['ceph.rook.io/pvc'],
- namespace=self.rook_env.namespace,
- propagation_policy='Foreground')
-
- def purge_osds(self) -> None:
- for id in self.osd_ids:
- purge_args = {
- 'prefix': 'osd purge-actual',
- 'id': int(id),
- 'yes_i_really_mean_it': True
- }
- ret, out, err = self.mon_command(purge_args)
- if ret != 0:
- raise RuntimeError(err)
-
- def destroy_osds(self) -> None:
- for id in self.osd_ids:
- destroy_args = {
- 'prefix': 'osd destroy-actual',
- 'id': int(id),
- 'yes_i_really_mean_it': True
- }
- ret, out, err = self.mon_command(destroy_args)
- if ret != 0:
- raise RuntimeError(err)
-
- def remove(self) -> str:
- try:
- self.check_force()
- except Exception as e:
- log.exception("Error checking if OSDs are safe to destroy")
- return f"OSDs not safe to destroy or unable to check if they are safe to destroy: {e}"
- try:
- remove_result = self.remove_device_sets()
- except Exception as e:
- log.exception("Error patching ceph cluster CRD")
- return f"Not possible to modify Ceph cluster CRD: {e}"
- try:
- self.scale_deployments()
- self.delete_deployments()
- self.clean_up_prepare_jobs_and_pvc()
- except Exception as e:
- log.exception("Ceph cluster CRD patched, but error cleaning environment")
- return f"Error cleaning environment after removing OSDs from Ceph cluster CRD: {e}"
- try:
- self.set_osds_down()
- self.set_osds_out()
- if self.replace_flag:
- self.destroy_osds()
- else:
- self.purge_osds()
- except Exception as e:
- log.exception("OSDs removed from environment, but not able to remove OSDs from Ceph cluster")
- return f"Error removing OSDs from Ceph cluster: {e}"
-
- return remove_result
-
-
-
class RookCluster(object):
# import of client.CoreV1Api must be optional at import time.
# Instead allow mgr/rook to be imported anyway.
name=spec.rgw_zone
)
return object_store
-
def _update_zone(new: cos.CephObjectStore) -> cos.CephObjectStore:
if new.spec.gateway:
return new
return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count)
- def add_osds(self, drive_group, matching_hosts):
- # type: (DriveGroupSpec, List[str]) -> str
- assert drive_group.objectstore in ("bluestore", "filestore")
- assert drive_group.service_id
- storage_class = self.get_storage_class()
- inventory = self.get_discovered_devices()
- creator: Optional[DefaultCreator] = None
- if (
- storage_class.metadata.labels
- and 'local.storage.openshift.io/owner-name' in storage_class.metadata.labels
- ):
- creator = LSOCreator(inventory, self.coreV1_api, self.storage_class_name)
- else:
- creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class_name)
- return self._patch(
- ccl.CephCluster,
- 'cephclusters',
- self.rook_env.cluster_name,
- creator.add_osds(self.rook_pods, drive_group, matching_hosts)
- )
-
- def remove_osds(self, osd_ids: List[str], replace: bool, force: bool, mon_command: Callable) -> str:
- inventory = self.get_discovered_devices()
- self.remover = DefaultRemover(
- self.coreV1_api,
- self.batchV1_api,
- self.appsV1_api,
- osd_ids,
- replace,
- force,
- mon_command,
- self._patch,
- self.rook_env,
- inventory
- )
- return self.remover.remove()
-
def get_hosts(self) -> List[orchestrator.HostSpec]:
ret = []
for node in self.nodes.items:
spec = orchestrator.HostSpec(
- node.metadata.name,
+ node.metadata.name,
addr='/'.join([addr.address for addr in node.status.addresses]),
labels=[label.split('/')[1] for label in node.metadata.labels if label.startswith('ceph-label')],
)
res.label = expression.key.split('/')[1]
elif expression.key == "kubernetes.io/hostname":
if expression.operator == "Exists":
- res.host_pattern = "*"
+ res.host_pattern = HostPattern("*")
elif expression.operator == "In":
res.hosts = [HostPlacementSpec(hostname=value, network='', name='')for value in expression.values]
return res