label_selector="rook_cluster={0}".format(
self.rook_env.namespace))
self.nodes: KubernetesResource[client.V1Node] = KubernetesResource(self.coreV1_api.list_node)
-
+ self.drive_group_map: Dict[str, Any] = {}
+ self.drive_group_lock = threading.Lock()
+
def rook_url(self, path: str) -> str:
prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
self.rook_env.crd_version, self.rook_env.namespace)
else:
self.creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class)
_add_osds = self.creator.add_osds(self.rook_pods, drive_group, matching_hosts)
- return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds)
-
- def _add_osds(current_cluster, new_cluster):
- # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
-
- # FIXME: this is all not really atomic, because jsonpatch doesn't
- # let us do "test" operations that would check if items with
- # matching names were in existing lists.
- if not new_cluster.spec.storage:
- raise orchestrator.OrchestratorError('new_cluster missing storage attribute')
-
- if not hasattr(new_cluster.spec.storage, 'nodes'):
- new_cluster.spec.storage.nodes = ccl.NodesList()
-
- current_nodes = getattr(current_cluster.spec.storage, 'nodes', ccl.NodesList())
- matching_host = matching_hosts[0]
-
- if matching_host not in [n.name for n in current_nodes]:
- # FIXME: ccl.Config stopped existing since rook changed
- # their CRDs, check if config is actually necessary for this
-
- pd = ccl.NodesItem(
- name=matching_host,
- config=ccl.Config(
- storeType=drive_group.objectstore
- )
- )
-
- if block_devices:
- pd.devices = ccl.DevicesList(
- ccl.DevicesItem(name=d.path) for d in block_devices
- )
- if directories:
- pd.directories = ccl.DirectoriesList(
- ccl.DirectoriesItem(path=p) for p in directories
- )
- new_cluster.spec.storage.nodes.append(pd)
- else:
- for _node in new_cluster.spec.storage.nodes:
- current_node = _node # type: ccl.NodesItem
- if current_node.name == matching_host:
- if block_devices:
- if not hasattr(current_node, 'devices'):
- current_node.devices = ccl.DevicesList()
- current_device_names = set(d.name for d in current_node.devices)
- new_devices = [bd for bd in block_devices if bd.path not in current_device_names]
- current_node.devices.extend(
- ccl.DevicesItem(name=n.path) for n in new_devices
- )
+ with self.drive_group_lock:
+ self.drive_group_map[drive_group.service_id] = _add_osds
+ return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds)
- if directories:
- if not hasattr(current_node, 'directories'):
- current_node.directories = ccl.DirectoriesList()
- new_dirs = list(set(directories) - set([d.path for d in current_node.directories]))
- current_node.directories.extend(
- ccl.DirectoriesItem(path=n) for n in new_dirs
- )
- return new_cluster
+ @threaded
+ def drive_group_loop(self) -> None:
+ ten_minutes = 10 * 60
+ while True:
+ sleep(ten_minutes)
+ with self.drive_group_lock:
+ for _, add_osd in self.drive_group_map.items():
+ self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, add_osd)
def _patch(self, crd: Type, crd_name: str, cr_name: str, func: Callable[[CrdClassT, CrdClassT], CrdClassT]) -> str:
current_json = self.rook_api_get(