]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: lock multithreaded access to OSDRemovalQueue 38815/head
authorSebastian Wagner <sebastian.wagner@suse.com>
Fri, 8 Jan 2021 12:54:46 +0000 (13:54 +0100)
committerSebastian Wagner <sebastian.wagner@suse.com>
Thu, 14 Jan 2021 12:00:46 +0000 (13:00 +0100)
Since the set can be changed also from the CLI thread

Fixes: https://tracker.ceph.com/issues/47700
Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
src/pybind/mgr/cephadm/services/osd.py

index a8c589e489ea119fa8a2c2486b42f7ad0b8242de..5516b575273ba461a49e34724f6914a9a4ab7a3e 100644 (file)
@@ -1,5 +1,6 @@
 import json
 import logging
+from threading import Lock
 from typing import List, Dict, Any, Set, Union, Tuple, cast, Optional, TYPE_CHECKING
 
 from ceph.deployment import translate
@@ -602,28 +603,36 @@ class OSDRemovalQueue(object):
         self.osds: Set[OSD] = set()
         self.rm_util = RemoveUtil(mgr)
 
+        # locks multithreaded access to self.osds. Please avoid locking
+        # network calls, like mon commands.
+        self.lock = Lock()
+
     def process_removal_queue(self) -> None:
         """
         Performs actions in the _serve() loop to remove an OSD
         when criteria is met.
+
+        we can't hold self.lock, as we're calling _remove_daemon in the loop
         """
 
         # make sure that we don't run on OSDs that are not in the cluster anymore.
         self.cleanup()
 
-        logger.debug(
-            f"{self.queue_size()} OSDs are scheduled "
-            f"for removal: {self.all_osds()}")
-
         # find osds that are ok-to-stop and not yet draining
         ok_to_stop_osds = self.rm_util.find_osd_stop_threshold(self.idling_osds())
         if ok_to_stop_osds:
             # start draining those
             _ = [osd.start_draining() for osd in ok_to_stop_osds]
 
+        all_osds = self.all_osds()
+
+        logger.debug(
+            f"{self.queue_size()} OSDs are scheduled "
+            f"for removal: {all_osds}")
+
         # Check all osds for their state and take action (remove, purge etc)
         new_queue: Set[OSD] = set()
-        for osd in self.all_osds():  # type: OSD
+        for osd in all_osds:  # type: OSD
             if not osd.force:
                 # skip criteria
                 if not osd.is_empty:
@@ -662,66 +671,78 @@ class OSDRemovalQueue(object):
         # self could change while this is processing (osds get added from the CLI)
         # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
         # osds that were added while this method was executed'
-        self.osds.intersection_update(new_queue)
-        self.save_to_store()
+        with self.lock:
+            self.osds.intersection_update(new_queue)
+            self._save_to_store()
 
     def cleanup(self) -> None:
         # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
-        for osd in self.not_in_cluster():
-            self.osds.remove(osd)
+        with self.lock:
+            for osd in self._not_in_cluster():
+                self.osds.remove(osd)
 
-    def save_to_store(self) -> None:
-        osd_queue = [osd.to_json() for osd in self.all_osds()]
+    def _save_to_store(self) -> None:
+        osd_queue = [osd.to_json() for osd in self.osds]
         logger.debug(f"Saving {osd_queue} to store")
         self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue))
 
     def load_from_store(self) -> None:
-        for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
-            for osd in json.loads(v):
-                logger.debug(f"Loading osd ->{osd} from store")
-                osd_obj = OSD.from_json(osd, rm_util=self.rm_util)
-                if osd_obj is not None:
-                    self.osds.add(osd_obj)
+        with self.lock:
+            for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
+                for osd in json.loads(v):
+                    logger.debug(f"Loading osd ->{osd} from store")
+                    osd_obj = OSD.from_json(osd, rm_util=self.rm_util)
+                    if osd_obj is not None:
+                        self.osds.add(osd_obj)
 
     def as_osd_ids(self) -> List[int]:
-        return [osd.osd_id for osd in self.osds]
+        with self.lock:
+            return [osd.osd_id for osd in self.osds]
 
     def queue_size(self) -> int:
-        return len(self.osds)
+        with self.lock:
+            return len(self.osds)
 
     def draining_osds(self) -> List["OSD"]:
-        return [osd for osd in self.osds if osd.is_draining]
+        with self.lock:
+            return [osd for osd in self.osds if osd.is_draining]
 
     def idling_osds(self) -> List["OSD"]:
-        return [osd for osd in self.osds if not osd.is_draining and not osd.is_empty]
+        with self.lock:
+            return [osd for osd in self.osds if not osd.is_draining and not osd.is_empty]
 
     def empty_osds(self) -> List["OSD"]:
-        return [osd for osd in self.osds if osd.is_empty]
+        with self.lock:
+            return [osd for osd in self.osds if osd.is_empty]
 
     def all_osds(self) -> List["OSD"]:
-        return [osd for osd in self.osds]
+        with self.lock:
+            return [osd for osd in self.osds]
 
-    def not_in_cluster(self) -> List["OSD"]:
+    def _not_in_cluster(self) -> List["OSD"]:
         return [osd for osd in self.osds if not osd.exists]
 
     def enqueue(self, osd: "OSD") -> None:
         if not osd.exists:
             raise NotFoundError()
-        self.osds.add(osd)
+        with self.lock:
+            self.osds.add(osd)
         osd.start()
 
     def rm(self, osd: "OSD") -> None:
         if not osd.exists:
             raise NotFoundError()
         osd.stop()
-        try:
-            logger.debug(f'Removing {osd} from the queue.')
-            self.osds.remove(osd)
-        except KeyError:
-            logger.debug(f"Could not find {osd} in queue.")
-            raise KeyError
+        with self.lock:
+            try:
+                logger.debug(f'Removing {osd} from the queue.')
+                self.osds.remove(osd)
+            except KeyError:
+                logger.debug(f"Could not find {osd} in queue.")
+                raise KeyError
 
     def __eq__(self, other: Any) -> bool:
         if not isinstance(other, OSDRemovalQueue):
             return False
-        return self.osds == other.osds
+        with self.lock:
+            return self.osds == other.osds