]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/rook: keep drive groups updated and clean up cluster CR
authorJoseph Sawaya <jsawaya@redhat.com>
Mon, 19 Jul 2021 13:50:39 +0000 (09:50 -0400)
committerJoseph Sawaya <jsawaya@redhat.com>
Tue, 17 Aug 2021 14:50:27 +0000 (10:50 -0400)
This commit creates a new threaded methods on the RookCluster class
that keeps the cluster updated by re-applying drive groups in a
loop.

Signed-off-by: Joseph Sawaya <jsawaya@redhat.com>
src/pybind/mgr/rook/rook_cluster.py

index 12ee80fd87c800c2e8483f557602e31dfc6dcfb4..9f69447bca267b8fbddfd71b3322ed4e5a9e6646 100644 (file)
@@ -489,7 +489,9 @@ class RookCluster(object):
                                             label_selector="rook_cluster={0}".format(
                                                 self.rook_env.namespace))
         self.nodes: KubernetesResource[client.V1Node] = KubernetesResource(self.coreV1_api.list_node)
-
+        self.drive_group_map: Dict[str, Any] = {}
+        self.drive_group_lock = threading.Lock()
+        
     def rook_url(self, path: str) -> str:
         prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
             self.rook_env.crd_version, self.rook_env.namespace)
@@ -849,64 +851,18 @@ class RookCluster(object):
         else:
             self.creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class)
         _add_osds = self.creator.add_osds(self.rook_pods, drive_group, matching_hosts)
-        return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds)
-
-        def _add_osds(current_cluster, new_cluster):
-            # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
-
-            # FIXME: this is all not really atomic, because jsonpatch doesn't
-            # let us do "test" operations that would check if items with
-            # matching names were in existing lists.
-            if not new_cluster.spec.storage:
-                raise orchestrator.OrchestratorError('new_cluster missing storage attribute')
-
-            if not hasattr(new_cluster.spec.storage, 'nodes'):
-                new_cluster.spec.storage.nodes = ccl.NodesList()
-
-            current_nodes = getattr(current_cluster.spec.storage, 'nodes', ccl.NodesList())
-            matching_host = matching_hosts[0]
-            
-            if matching_host not in [n.name for n in current_nodes]:
-                # FIXME: ccl.Config stopped existing since rook changed
-                # their CRDs, check if config is actually necessary for this
-                
-                pd = ccl.NodesItem(
-                    name=matching_host,
-                    config=ccl.Config(
-                        storeType=drive_group.objectstore
-                    )
-                )
-
-                if block_devices:
-                    pd.devices = ccl.DevicesList(
-                        ccl.DevicesItem(name=d.path) for d in block_devices
-                    )
-                if directories:
-                    pd.directories = ccl.DirectoriesList(
-                        ccl.DirectoriesItem(path=p) for p in directories
-                    )
-                new_cluster.spec.storage.nodes.append(pd)
-            else:
-                for _node in new_cluster.spec.storage.nodes:
-                    current_node = _node  # type: ccl.NodesItem
-                    if current_node.name == matching_host:
-                        if block_devices:
-                            if not hasattr(current_node, 'devices'):
-                                current_node.devices = ccl.DevicesList()
-                            current_device_names = set(d.name for d in current_node.devices)
-                            new_devices = [bd for bd in block_devices if bd.path not in current_device_names]
-                            current_node.devices.extend(
-                                ccl.DevicesItem(name=n.path) for n in new_devices
-                            )
+        with self.drive_group_lock:
+            self.drive_group_map[drive_group.service_id] = _add_osds
+            return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds)
 
-                        if directories:
-                            if not hasattr(current_node, 'directories'):
-                                current_node.directories = ccl.DirectoriesList()
-                            new_dirs = list(set(directories) - set([d.path for d in current_node.directories]))
-                            current_node.directories.extend(
-                                ccl.DirectoriesItem(path=n) for n in new_dirs
-                            )
-            return new_cluster
+    @threaded
+    def drive_group_loop(self) -> None:
+        ten_minutes = 10 * 60
+        while True:
+            sleep(ten_minutes)
+            with self.drive_group_lock:
+                for _, add_osd in self.drive_group_map.items():
+                    self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, add_osd)
 
     def _patch(self, crd: Type, crd_name: str, cr_name: str, func: Callable[[CrdClassT, CrdClassT], CrdClassT]) -> str:
         current_json = self.rook_api_get(