import json
import logging
+from threading import Lock
from typing import List, Dict, Any, Set, Union, Tuple, cast, Optional, TYPE_CHECKING
from ceph.deployment import translate
self.osds: Set[OSD] = set()
self.rm_util = RemoveUtil(mgr)
+ # locks multithreaded access to self.osds. Please avoid locking
+ # network calls, like mon commands.
+ self.lock = Lock()
+
def process_removal_queue(self) -> None:
"""
Performs actions in the _serve() loop to remove an OSD
when criteria is met.
+
+ we can't hold self.lock, as we're calling _remove_daemon in the loop
"""
# make sure that we don't run on OSDs that are not in the cluster anymore.
self.cleanup()
- logger.debug(
- f"{self.queue_size()} OSDs are scheduled "
- f"for removal: {self.all_osds()}")
-
# find osds that are ok-to-stop and not yet draining
ok_to_stop_osds = self.rm_util.find_osd_stop_threshold(self.idling_osds())
if ok_to_stop_osds:
# start draining those
_ = [osd.start_draining() for osd in ok_to_stop_osds]
+ all_osds = self.all_osds()
+
+ logger.debug(
+ f"{self.queue_size()} OSDs are scheduled "
+ f"for removal: {all_osds}")
+
# Check all osds for their state and take action (remove, purge etc)
new_queue: Set[OSD] = set()
- for osd in self.all_osds(): # type: OSD
+ for osd in all_osds: # type: OSD
if not osd.force:
# skip criteria
if not osd.is_empty:
# self could change while this is processing (osds get added from the CLI)
# The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
# osds that were added while this method was executed'
- self.osds.intersection_update(new_queue)
- self.save_to_store()
+ with self.lock:
+ self.osds.intersection_update(new_queue)
+ self._save_to_store()
def cleanup(self) -> None:
# OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
- for osd in self.not_in_cluster():
- self.osds.remove(osd)
+ with self.lock:
+ for osd in self._not_in_cluster():
+ self.osds.remove(osd)
- def save_to_store(self) -> None:
- osd_queue = [osd.to_json() for osd in self.all_osds()]
+ def _save_to_store(self) -> None:
+ osd_queue = [osd.to_json() for osd in self.osds]
logger.debug(f"Saving {osd_queue} to store")
self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue))
def load_from_store(self) -> None:
- for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
- for osd in json.loads(v):
- logger.debug(f"Loading osd ->{osd} from store")
- osd_obj = OSD.from_json(osd, rm_util=self.rm_util)
- if osd_obj is not None:
- self.osds.add(osd_obj)
+ with self.lock:
+ for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
+ for osd in json.loads(v):
+ logger.debug(f"Loading osd ->{osd} from store")
+ osd_obj = OSD.from_json(osd, rm_util=self.rm_util)
+ if osd_obj is not None:
+ self.osds.add(osd_obj)
def as_osd_ids(self) -> List[int]:
- return [osd.osd_id for osd in self.osds]
+ with self.lock:
+ return [osd.osd_id for osd in self.osds]
def queue_size(self) -> int:
- return len(self.osds)
+ with self.lock:
+ return len(self.osds)
def draining_osds(self) -> List["OSD"]:
- return [osd for osd in self.osds if osd.is_draining]
+ with self.lock:
+ return [osd for osd in self.osds if osd.is_draining]
def idling_osds(self) -> List["OSD"]:
- return [osd for osd in self.osds if not osd.is_draining and not osd.is_empty]
+ with self.lock:
+ return [osd for osd in self.osds if not osd.is_draining and not osd.is_empty]
def empty_osds(self) -> List["OSD"]:
- return [osd for osd in self.osds if osd.is_empty]
+ with self.lock:
+ return [osd for osd in self.osds if osd.is_empty]
def all_osds(self) -> List["OSD"]:
- return [osd for osd in self.osds]
+ with self.lock:
+ return [osd for osd in self.osds]
- def not_in_cluster(self) -> List["OSD"]:
+ def _not_in_cluster(self) -> List["OSD"]:
return [osd for osd in self.osds if not osd.exists]
def enqueue(self, osd: "OSD") -> None:
if not osd.exists:
raise NotFoundError()
- self.osds.add(osd)
+ with self.lock:
+ self.osds.add(osd)
osd.start()
def rm(self, osd: "OSD") -> None:
if not osd.exists:
raise NotFoundError()
osd.stop()
- try:
- logger.debug(f'Removing {osd} from the queue.')
- self.osds.remove(osd)
- except KeyError:
- logger.debug(f"Could not find {osd} in queue.")
- raise KeyError
+ with self.lock:
+ try:
+ logger.debug(f'Removing {osd} from the queue.')
+ self.osds.remove(osd)
+ except KeyError:
+ logger.debug(f"Could not find {osd} in queue.")
+ raise KeyError
def __eq__(self, other: Any) -> bool:
if not isinstance(other, OSDRemovalQueue):
return False
- return self.osds == other.osds
+ with self.lock:
+ return self.osds == other.osds