]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/rook: persist applied drive groups and fix drive_group_loop
authorJoseph Sawaya <jsawaya@redhat.com>
Wed, 15 Sep 2021 19:59:29 +0000 (15:59 -0400)
committerSage Weil <sage@newdream.net>
Thu, 4 Nov 2021 15:49:50 +0000 (10:49 -0500)
This commit changes OSD creation to store applied drive groups in a map
and persist that map in the mon's KV store in a map. This commit also
modifies the rook orchestrator's serve loop to periodically re-apply drive
groups, and creates a module option to specify the interval between
re-applies.

Signed-off-by: Joseph Sawaya <jsawaya@redhat.com>
src/pybind/mgr/rook/module.py

index c0c1c32c9f27ba04f47eb0e4aea6ca0b2e3137c2..4ba044413a2654aba1927a2ed5c7b6e0f355df27 100644 (file)
@@ -9,7 +9,7 @@ from ceph.deployment import inventory
 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec
 from ceph.utils import datetime_now
 
-from typing import List, Dict, Optional, Callable, Any, TypeVar, Tuple
+from typing import List, Dict, Optional, Callable, Any, TypeVar, Tuple, TYPE_CHECKING
 
 try:
     from ceph.deployment.drive_group import DriveGroupSpec
@@ -81,6 +81,12 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
             default='local',
             desc='storage class name for LSO-discovered PVs',
         ),
+        Option(
+            'drive_group_interval',
+            type='float',
+            default=300.0,
+            desc='interval in seconds between re-application of applied drive_groups',
+        ),
     ]
 
     @staticmethod
@@ -115,8 +121,13 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         self._rook_cluster: Optional[RookCluster] = None
         self._rook_env = RookEnv()
         self._k8s_AppsV1_api: Optional[client.AppsV1Api] = None
-        self.storage_class = self.get_module_option('storage_class')
 
+        self.config_notify()
+        if TYPE_CHECKING:
+            self.storage_class = 'foo'
+            self.drive_group_interval = 10.0
+
+        self._load_drive_groups()
         self._shutdown = threading.Event()
         
     def config_notify(self) -> None:
@@ -132,7 +143,10 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
             self.log.debug(' mgr option %s = %s',
                            opt['name'], getattr(self, opt['name']))  # type: ignore
         assert isinstance(self.storage_class, str)
-        self.rook_cluster.storage_class = self.storage_class
+        assert isinstance(self.drive_group_interval, float)
+
+        if self._rook_cluster:
+            self._rook_cluster.storage_class = self.storage_class
 
     def shutdown(self) -> None:
         self._shutdown.set()
@@ -194,9 +208,11 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
             self.storage_class)
 
         self._initialized.set()
+        self.config_notify()
 
         while not self._shutdown.is_set():
-            self._shutdown.wait(5)
+            self._apply_drivegroups(list(self._drive_group_map.values()))
+            self._shutdown.wait(self.drive_group_interval)
 
     @handle_orch_error
     def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
@@ -523,10 +539,18 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         return self.rook_cluster.remove_pods(names)
 
     def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
-        result_list = []
-        all_hosts = raise_if_exception(self.get_hosts())
         for drive_group in specs:
-            matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts)
+            self._drive_group_map[str(drive_group.service_id)] = drive_group
+        self._save_drive_groups()
+        return OrchResult(self._apply_drivegroups(specs))
+
+    def _apply_drivegroups(self, ls: List[DriveGroupSpec]) -> List[str]:
+        all_hosts = raise_if_exception(self.get_hosts())
+        result_list: List[str] = []
+        for drive_group in ls:
+            matching_hosts = drive_group.placement.filter_matching_hosts(
+                lambda label=None, as_hostspec=None: all_hosts
+            )
 
             if not self.rook_cluster.node_exists(matching_hosts[0]):
                 raise RuntimeError("Node '{0}' is not in the Kubernetes "
@@ -538,7 +562,23 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
                 raise RuntimeError("Rook cluster configuration does not "
                                 "support OSD creation.")
             result_list.append(self.rook_cluster.add_osds(drive_group, matching_hosts))
-        return OrchResult(result_list)
+        return result_list
+
+    def _load_drive_groups(self) -> None:
+        stored_drive_group = self.get_store("drive_group_map")
+        self._drive_group_map: Dict[str, DriveGroupSpec] = {}
+        if stored_drive_group:
+            for name, dg in json.loads(stored_drive_group).items():
+                try:
+                    self._drive_group_map[name] = DriveGroupSpec.from_json(dg)
+                except ValueError as e:
+                    self.log.error(f'Failed to load drive group {name} ({dg}): {e}')
+
+    def _save_drive_groups(self) -> None:
+        json_drive_group_map = {
+            name: dg.to_json() for name, dg in self._drive_group_map.items()
+        }
+        self.set_store("drive_group_map", json.dumps(json_drive_group_map))
 
     def remove_osds(self, osd_ids: List[str], replace: bool = False, force: bool = False, zap: bool = False) -> OrchResult[str]:
         assert self._rook_cluster is not None