From caa7b9828e152e1cfd8dd583dfdf72474661d9c9 Mon Sep 17 00:00:00 2001 From: Joseph Sawaya Date: Mon, 19 Jul 2021 09:50:39 -0400 Subject: [PATCH] mgr/rook: keep drive groups updated and clean up cluster CR This commit creates a new threaded methods on the RookCluster class that keeps the cluster updated by re-applying drive groups in a loop. Signed-off-by: Joseph Sawaya --- src/pybind/mgr/rook/rook_cluster.py | 72 ++++++----------------------- 1 file changed, 14 insertions(+), 58 deletions(-) diff --git a/src/pybind/mgr/rook/rook_cluster.py b/src/pybind/mgr/rook/rook_cluster.py index 12ee80fd87c80..9f69447bca267 100644 --- a/src/pybind/mgr/rook/rook_cluster.py +++ b/src/pybind/mgr/rook/rook_cluster.py @@ -489,7 +489,9 @@ class RookCluster(object): label_selector="rook_cluster={0}".format( self.rook_env.namespace)) self.nodes: KubernetesResource[client.V1Node] = KubernetesResource(self.coreV1_api.list_node) - + self.drive_group_map: Dict[str, Any] = {} + self.drive_group_lock = threading.Lock() + def rook_url(self, path: str) -> str: prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % ( self.rook_env.crd_version, self.rook_env.namespace) @@ -849,64 +851,18 @@ class RookCluster(object): else: self.creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class) _add_osds = self.creator.add_osds(self.rook_pods, drive_group, matching_hosts) - return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds) - - def _add_osds(current_cluster, new_cluster): - # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster - - # FIXME: this is all not really atomic, because jsonpatch doesn't - # let us do "test" operations that would check if items with - # matching names were in existing lists. - if not new_cluster.spec.storage: - raise orchestrator.OrchestratorError('new_cluster missing storage attribute') - - if not hasattr(new_cluster.spec.storage, 'nodes'): - new_cluster.spec.storage.nodes = ccl.NodesList() - - current_nodes = getattr(current_cluster.spec.storage, 'nodes', ccl.NodesList()) - matching_host = matching_hosts[0] - - if matching_host not in [n.name for n in current_nodes]: - # FIXME: ccl.Config stopped existing since rook changed - # their CRDs, check if config is actually necessary for this - - pd = ccl.NodesItem( - name=matching_host, - config=ccl.Config( - storeType=drive_group.objectstore - ) - ) - - if block_devices: - pd.devices = ccl.DevicesList( - ccl.DevicesItem(name=d.path) for d in block_devices - ) - if directories: - pd.directories = ccl.DirectoriesList( - ccl.DirectoriesItem(path=p) for p in directories - ) - new_cluster.spec.storage.nodes.append(pd) - else: - for _node in new_cluster.spec.storage.nodes: - current_node = _node # type: ccl.NodesItem - if current_node.name == matching_host: - if block_devices: - if not hasattr(current_node, 'devices'): - current_node.devices = ccl.DevicesList() - current_device_names = set(d.name for d in current_node.devices) - new_devices = [bd for bd in block_devices if bd.path not in current_device_names] - current_node.devices.extend( - ccl.DevicesItem(name=n.path) for n in new_devices - ) + with self.drive_group_lock: + self.drive_group_map[drive_group.service_id] = _add_osds + return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds) - if directories: - if not hasattr(current_node, 'directories'): - current_node.directories = ccl.DirectoriesList() - new_dirs = list(set(directories) - set([d.path for d in current_node.directories])) - current_node.directories.extend( - ccl.DirectoriesItem(path=n) for n in new_dirs - ) - return new_cluster + @threaded + def drive_group_loop(self) -> None: + ten_minutes = 10 * 60 + while True: + sleep(ten_minutes) + with self.drive_group_lock: + for _, add_osd in self.drive_group_map.items(): + self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, add_osd) def _patch(self, crd: Type, crd_name: str, cr_name: str, func: Callable[[CrdClassT, CrdClassT], CrdClassT]) -> str: current_json = self.rook_api_get( -- 2.39.5