From db7c6a56125a3a36fa50904557280d69829c69d6 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Fri, 8 Jan 2021 13:54:46 +0100 Subject: [PATCH] mgr/cephadm: lock multithreaded access to OSDRemovalQueue Since the set can be changed also from the CLI thread Fixes: https://tracker.ceph.com/issues/47700 Signed-off-by: Sebastian Wagner --- src/pybind/mgr/cephadm/services/osd.py | 85 ++++++++++++++++---------- 1 file changed, 53 insertions(+), 32 deletions(-) diff --git a/src/pybind/mgr/cephadm/services/osd.py b/src/pybind/mgr/cephadm/services/osd.py index a8c589e489ea..5516b575273b 100644 --- a/src/pybind/mgr/cephadm/services/osd.py +++ b/src/pybind/mgr/cephadm/services/osd.py @@ -1,5 +1,6 @@ import json import logging +from threading import Lock from typing import List, Dict, Any, Set, Union, Tuple, cast, Optional, TYPE_CHECKING from ceph.deployment import translate @@ -602,28 +603,36 @@ class OSDRemovalQueue(object): self.osds: Set[OSD] = set() self.rm_util = RemoveUtil(mgr) + # locks multithreaded access to self.osds. Please avoid locking + # network calls, like mon commands. + self.lock = Lock() + def process_removal_queue(self) -> None: """ Performs actions in the _serve() loop to remove an OSD when criteria is met. + + we can't hold self.lock, as we're calling _remove_daemon in the loop """ # make sure that we don't run on OSDs that are not in the cluster anymore. self.cleanup() - logger.debug( - f"{self.queue_size()} OSDs are scheduled " - f"for removal: {self.all_osds()}") - # find osds that are ok-to-stop and not yet draining ok_to_stop_osds = self.rm_util.find_osd_stop_threshold(self.idling_osds()) if ok_to_stop_osds: # start draining those _ = [osd.start_draining() for osd in ok_to_stop_osds] + all_osds = self.all_osds() + + logger.debug( + f"{self.queue_size()} OSDs are scheduled " + f"for removal: {all_osds}") + # Check all osds for their state and take action (remove, purge etc) new_queue: Set[OSD] = set() - for osd in self.all_osds(): # type: OSD + for osd in all_osds: # type: OSD if not osd.force: # skip criteria if not osd.is_empty: @@ -662,66 +671,78 @@ class OSDRemovalQueue(object): # self could change while this is processing (osds get added from the CLI) # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and # osds that were added while this method was executed' - self.osds.intersection_update(new_queue) - self.save_to_store() + with self.lock: + self.osds.intersection_update(new_queue) + self._save_to_store() def cleanup(self) -> None: # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs - for osd in self.not_in_cluster(): - self.osds.remove(osd) + with self.lock: + for osd in self._not_in_cluster(): + self.osds.remove(osd) - def save_to_store(self) -> None: - osd_queue = [osd.to_json() for osd in self.all_osds()] + def _save_to_store(self) -> None: + osd_queue = [osd.to_json() for osd in self.osds] logger.debug(f"Saving {osd_queue} to store") self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue)) def load_from_store(self) -> None: - for k, v in self.mgr.get_store_prefix('osd_remove_queue').items(): - for osd in json.loads(v): - logger.debug(f"Loading osd ->{osd} from store") - osd_obj = OSD.from_json(osd, rm_util=self.rm_util) - if osd_obj is not None: - self.osds.add(osd_obj) + with self.lock: + for k, v in self.mgr.get_store_prefix('osd_remove_queue').items(): + for osd in json.loads(v): + logger.debug(f"Loading osd ->{osd} from store") + osd_obj = OSD.from_json(osd, rm_util=self.rm_util) + if osd_obj is not None: + self.osds.add(osd_obj) def as_osd_ids(self) -> List[int]: - return [osd.osd_id for osd in self.osds] + with self.lock: + return [osd.osd_id for osd in self.osds] def queue_size(self) -> int: - return len(self.osds) + with self.lock: + return len(self.osds) def draining_osds(self) -> List["OSD"]: - return [osd for osd in self.osds if osd.is_draining] + with self.lock: + return [osd for osd in self.osds if osd.is_draining] def idling_osds(self) -> List["OSD"]: - return [osd for osd in self.osds if not osd.is_draining and not osd.is_empty] + with self.lock: + return [osd for osd in self.osds if not osd.is_draining and not osd.is_empty] def empty_osds(self) -> List["OSD"]: - return [osd for osd in self.osds if osd.is_empty] + with self.lock: + return [osd for osd in self.osds if osd.is_empty] def all_osds(self) -> List["OSD"]: - return [osd for osd in self.osds] + with self.lock: + return [osd for osd in self.osds] - def not_in_cluster(self) -> List["OSD"]: + def _not_in_cluster(self) -> List["OSD"]: return [osd for osd in self.osds if not osd.exists] def enqueue(self, osd: "OSD") -> None: if not osd.exists: raise NotFoundError() - self.osds.add(osd) + with self.lock: + self.osds.add(osd) osd.start() def rm(self, osd: "OSD") -> None: if not osd.exists: raise NotFoundError() osd.stop() - try: - logger.debug(f'Removing {osd} from the queue.') - self.osds.remove(osd) - except KeyError: - logger.debug(f"Could not find {osd} in queue.") - raise KeyError + with self.lock: + try: + logger.debug(f'Removing {osd} from the queue.') + self.osds.remove(osd) + except KeyError: + logger.debug(f"Could not find {osd} in queue.") + raise KeyError def __eq__(self, other: Any) -> bool: if not isinstance(other, OSDRemovalQueue): return False - return self.osds == other.osds + with self.lock: + return self.osds == other.osds -- 2.47.3