label_selector="rook_cluster={0}".format(
self.rook_env.namespace))
self.nodes: KubernetesResource[client.V1Node] = KubernetesResource(self.coreV1_api.list_node)
- self.drive_group_map: Dict[str, Any] = {}
- self.drive_group_lock = threading.Lock()
def rook_url(self, path: str) -> str:
prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
assert drive_group.service_id
storage_class = self.get_storage_class()
inventory = self.get_discovered_devices()
- self.creator: Optional[DefaultCreator] = None
- if storage_class.metadata.labels and ('local.storage.openshift.io/owner-name' in storage_class.metadata.labels):
- self.creator = LSOCreator(inventory, self.coreV1_api, self.storage_class)
+ creator: Optional[DefaultCreator] = None
+ if (
+ storage_class.metadata.labels
+ and 'local.storage.openshift.io/owner-name' in storage_class.metadata.labels
+ ):
+ creator = LSOCreator(inventory, self.coreV1_api, self.storage_class)
else:
- self.creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class)
- _add_osds = self.creator.add_osds(self.rook_pods, drive_group, matching_hosts)
- with self.drive_group_lock:
- self.drive_group_map[drive_group.service_id] = _add_osds
- return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds)
-
- @threaded
- def drive_group_loop(self) -> None:
- ten_minutes = 10 * 60
- while True:
- sleep(ten_minutes)
- with self.drive_group_lock:
- for _, add_osd in self.drive_group_map.items():
- self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, add_osd)
+ creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class)
+ return self._patch(
+ ccl.CephCluster,
+ 'cephclusters',
+ self.rook_env.cluster_name,
+ creator.add_osds(self.rook_pods, drive_group, matching_hosts)
+ )
def remove_osds(self, osd_ids: List[str], replace: bool, force: bool, mon_command: Callable) -> str:
inventory = self.get_discovered_devices()