# Check all osds for their state and take action (remove, purge etc)
to_remove_osds = self.mgr.to_remove_osds.all_osds()
- new_queue = set()
- for osd in to_remove_osds:
+ new_queue: Set[OSD] = set()
+ for osd in to_remove_osds: # type: OSD
if not osd.force:
# skip criteria
if not osd.is_empty:
for osd in json.loads(v):
logger.debug(f"Loading osd ->{osd} from store")
osd_obj = OSD.from_json(osd, ctx=self)
- self.mgr.to_remove_osds.add(osd_obj)
+ if osd_obj is not None:
+ self.mgr.to_remove_osds.add(osd_obj)
class NotFoundError(Exception):
self.fullname = fullname
# mgr obj to make mgr/mon calls
- self.rm_util = remove_util
+ self.rm_util: RemoveUtil = remove_util
def start(self) -> None:
if self.started:
return f"<OSD>(osd_id={self.osd_id}, draining={self.draining})"
-class OSDRemovalQueue(Set):
+class OSDRemovalQueue(object):
def __init__(self) -> None:
- super().__init__()
+ self.osds: Set[OSD] = set()
def as_osd_ids(self) -> List[int]:
- return [osd.osd_id for osd in self]
+ return [osd.osd_id for osd in self.osds]
def queue_size(self) -> int:
- return len(self)
+ return len(self.osds)
def draining_osds(self) -> List["OSD"]:
- return [osd for osd in self if osd.is_draining]
+ return [osd for osd in self.osds if osd.is_draining]
def idling_osds(self) -> List["OSD"]:
- return [osd for osd in self if not osd.is_draining and not osd.is_empty]
+ return [osd for osd in self.osds if not osd.is_draining and not osd.is_empty]
def empty_osds(self) -> List["OSD"]:
- return [osd for osd in self if osd.is_empty]
+ return [osd for osd in self.osds if osd.is_empty]
def all_osds(self) -> List["OSD"]:
- return [osd for osd in self]
+ return [osd for osd in self.osds]
def not_in_cluster(self) -> List["OSD"]:
- return [osd for osd in self if not osd.exists]
+ return [osd for osd in self.osds if not osd.exists]
def enqueue(self, osd: "OSD") -> None:
if not osd.exists:
raise NotFoundError()
- self.add(osd)
+ self.osds.add(osd)
osd.start()
def rm(self, osd: "OSD") -> None:
osd.stop()
try:
logger.debug(f'Removing {osd} from the queue.')
- self.remove(osd)
+ self.osds.remove(osd)
except KeyError:
logger.debug(f"Could not find {osd} in queue.")
raise KeyError
+
+ def remove(self, osd: OSD) -> None:
+ self.osds.remove(osd)
+
+ def add(self, osd: OSD) -> None:
+ self.osds.add(osd)
+
+ def intersection_update(self, other: Set[OSD]) -> None:
+ self.osds.intersection_update(other)
+
+ def __eq__(self, other: Any) -> bool:
+ if not isinstance(other, OSDRemovalQueue):
+ return False
+ return self.osds == other.osds
rm_util._run_mon_cmd.assert_called_with(
{'prefix': 'osd purge-actual', 'id': 1, 'yes_i_really_mean_it': True})
- def test_load(self, cephadm_module):
+ def test_load(self, cephadm_module, rm_util):
data = json.dumps([
{
"osd_id": 35,
cephadm_module.set_store('osd_remove_queue', data)
cephadm_module.rm_util.load_from_store()
- assert repr(
- cephadm_module.to_remove_osds) == 'OSDRemovalQueue({<OSD>(osd_id=35, draining=True)})'
+ expected = OSDRemovalQueue()
+ expected.add(OSD(osd_id=35, remove_util=rm_util, draining=True))
+ assert cephadm_module.to_remove_osds == expected
class TestOSD: