self.storage_classes : KubernetesResource = KubernetesResource(self.storageV1_api.list_storage_class)
self.rook_pods: KubernetesResource[client.V1Pod] = KubernetesResource(self.coreV1_api.list_namespaced_pod,
- namespace=self.rook_env.namespace,
- label_selector="rook_cluster={0}".format(
- self.rook_env.namespace))
+ namespace=self.rook_env.namespace,
+ label_selector="rook_cluster={0}".format(
+ self.rook_env.namespace))
self.nodes: KubernetesResource[client.V1Node] = KubernetesResource(self.coreV1_api.list_node)
-
+
def rook_url(self, path: str) -> str:
prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
self.rook_env.crd_version, self.rook_env.namespace)
return matching_sc[0]
def get_discovered_devices(self, nodenames: Optional[List[str]] = None) -> Dict[str, List[Device]]:
- storage_class = self.get_storage_class()
self.fetcher: Optional[DefaultFetcher] = None
- if storage_class.metadata.labels and ('local.storage.openshift.io/owner-name' in storage_class.metadata.labels):
- self.fetcher = LSOFetcher(self.storage_class, self.coreV1_api, self.customObjects_api, nodenames)
+ op_settings = self.coreV1_api.read_namespaced_config_map(name="rook-ceph-operator-config", namespace='rook-ceph').data
+ if op_settings.get('ROOK_ENABLE_DISCOVERY_DAEMON', 'false').lower() == 'true':
+ self.fetcher = PDFetcher(self.coreV1_api)
else:
- self.fetcher = DefaultFetcher(self.storage_class, self.coreV1_api)
+ storage_class = self.get_storage_class()
+ if storage_class.metadata.labels and ('local.storage.openshift.io/owner-name' in storage_class.metadata.labels):
+ self.fetcher = LSOFetcher(self.storage_class, self.coreV1_api, self.customObjects_api, nodenames)
+ else:
+ self.fetcher = DefaultFetcher(self.storage_class, self.coreV1_api)
+
self.fetcher.fetch()
return self.fetcher.devices()
-
+
def get_osds(self) -> List:
- osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod, namespace='rook-ceph', label_selector='app=rook-ceph-osd')
+ osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod,
+ namespace=self.rook_env.namespace,
+ label_selector='app=rook-ceph-osd')
return list(osd_pods.items)
def get_nfs_conf_url(self, nfs_cluster: str, instance: str) -> Optional[str]: