self.cache = HostCache(self)
self.cache.load()
- self.rm_util = RemoveUtil(self)
self.to_remove_osds = OSDRemovalQueue(self)
self.to_remove_osds.load_from_store()
hostname=daemon.hostname,
fullname=daemon.name(),
process_started_at=datetime.datetime.utcnow(),
- remove_util=self.rm_util))
+ remove_util=self.to_remove_osds.rm_util))
except NotFoundError:
return f"Unable to find OSDs: {osd_ids}"
for osd_id in osd_ids:
try:
self.to_remove_osds.rm(OSD(osd_id=int(osd_id),
- remove_util=self.rm_util))
+ remove_util=self.to_remove_osds.rm_util))
except (NotFoundError, KeyError):
return f'Unable to find OSD in the queue: {osd_id}'
def __init__(self, mgr: "CephadmOrchestrator") -> None:
self.mgr: "CephadmOrchestrator" = mgr
self.osds: Set[OSD] = set()
+ self.rm_util = RemoveUtil(mgr)
def process_removal_queue(self) -> None:
"""
f"for removal: {self.all_osds()}")
# find osds that are ok-to-stop and not yet draining
- ok_to_stop_osds = self.mgr.rm_util.find_osd_stop_threshold(self.idling_osds())
+ ok_to_stop_osds = self.rm_util.find_osd_stop_threshold(self.idling_osds())
if ok_to_stop_osds:
# start draining those
_ = [osd.start_draining() for osd in ok_to_stop_osds]
for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
for osd in json.loads(v):
logger.debug(f"Loading osd ->{osd} from store")
- osd_obj = OSD.from_json(osd, rm_util=self.mgr.rm_util)
+ osd_obj = OSD.from_json(osd, rm_util=self.rm_util)
if osd_obj is not None:
self.osds.add(osd_obj)
hostname='test',
fullname='osd.0',
process_started_at=datetime.datetime.utcnow(),
- remove_util=cephadm_module.rm_util
+ remove_util=cephadm_module.to_remove_osds.rm_util
))
cephadm_module.to_remove_osds.process_removal_queue()
assert cephadm_module.to_remove_osds == OSDRemovalQueue(cephadm_module)