def __init__(self, mgr: "CephadmOrchestrator") -> None:
self.mgr: "CephadmOrchestrator" = mgr
- def process_removal_queue(self) -> None:
- """
- Performs actions in the _serve() loop to remove an OSD
- when criteria is met.
- """
-
- # make sure that we don't run on OSDs that are not in the cluster anymore.
- self.cleanup()
-
- logger.debug(
- f"{self.mgr.to_remove_osds.queue_size()} OSDs are scheduled "
- f"for removal: {self.mgr.to_remove_osds.all_osds()}")
-
- # find osds that are ok-to-stop and not yet draining
- ok_to_stop_osds = self.find_osd_stop_threshold(self.mgr.to_remove_osds.idling_osds())
- if ok_to_stop_osds:
- # start draining those
- _ = [osd.start_draining() for osd in ok_to_stop_osds]
-
- # Check all osds for their state and take action (remove, purge etc)
- to_remove_osds = self.mgr.to_remove_osds.all_osds()
- new_queue: Set[OSD] = set()
- for osd in to_remove_osds: # type: OSD
- if not osd.force:
- # skip criteria
- if not osd.is_empty:
- logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
- new_queue.add(osd)
- continue
-
- if not osd.safe_to_destroy():
- logger.info(
- f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
- new_queue.add(osd)
- continue
-
- # abort criteria
- if not osd.down():
- # also remove it from the remove_osd list and set a health_check warning?
- raise orchestrator.OrchestratorError(
- f"Could not set OSD <{osd.osd_id}> to 'down'")
-
- if osd.replace:
- if not osd.destroy():
- raise orchestrator.OrchestratorError(
- f"Could not destroy OSD <{osd.osd_id}>")
- else:
- if not osd.purge():
- raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
-
- if not osd.exists:
- continue
- assert osd.fullname is not None
- assert osd.hostname is not None
- self.mgr._remove_daemon(osd.fullname, osd.hostname)
- logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.hostname}")
- logger.debug(f"Removing {osd.osd_id} from the queue.")
-
- # self.mgr.to_remove_osds could change while this is processing (osds get added from the CLI)
- # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
- # osds that were added while this method was executed'
- self.mgr.to_remove_osds.intersection_update(new_queue)
- self.save_to_store()
-
- def cleanup(self) -> None:
- # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
- not_in_cluster_osds = self.mgr.to_remove_osds.not_in_cluster()
- for osd in not_in_cluster_osds:
- self.mgr.to_remove_osds.remove(osd)
-
def get_osds_in_cluster(self) -> List[str]:
osd_map = self.mgr.get_osdmap()
return [str(x.get('osd')) for x in osd_map.dump().get('osds', [])]
self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
return True
- def save_to_store(self) -> None:
- osd_queue = [osd.to_json() for osd in self.mgr.to_remove_osds.all_osds()]
- logger.debug(f"Saving {osd_queue} to store")
- self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue))
-
- def load_from_store(self) -> None:
- for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
- for osd in json.loads(v):
- logger.debug(f"Loading osd ->{osd} from store")
- osd_obj = OSD.from_json(osd, ctx=self)
- if osd_obj is not None:
- self.mgr.to_remove_osds.add(osd_obj)
-
class NotFoundError(Exception):
pass
return out
@classmethod
- def from_json(cls, inp: Optional[Dict[str, Any]], ctx: Optional[RemoveUtil] = None) -> Optional["OSD"]:
+ def from_json(cls, inp: Optional[Dict[str, Any]], rm_util: RemoveUtil) -> Optional["OSD"]:
if not inp:
return None
for date_field in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
if inp.get(date_field):
inp.update({date_field: str_to_datetime(inp.get(date_field, ''))})
- inp.update({'remove_util': ctx})
+ inp.update({'remove_util': rm_util})
if 'nodename' in inp:
hostname = inp.pop('nodename')
inp['hostname'] = hostname
class OSDRemovalQueue(object):
- def __init__(self) -> None:
+ def __init__(self, mgr: "CephadmOrchestrator") -> None:
+ self.mgr: "CephadmOrchestrator" = mgr
self.osds: Set[OSD] = set()
+ def process_removal_queue(self) -> None:
+ """
+ Performs actions in the _serve() loop to remove an OSD
+ when criteria is met.
+ """
+
+ # make sure that we don't run on OSDs that are not in the cluster anymore.
+ self.cleanup()
+
+ logger.debug(
+ f"{self.queue_size()} OSDs are scheduled "
+ f"for removal: {self.all_osds()}")
+
+ # find osds that are ok-to-stop and not yet draining
+ ok_to_stop_osds = self.mgr.rm_util.find_osd_stop_threshold(self.idling_osds())
+ if ok_to_stop_osds:
+ # start draining those
+ _ = [osd.start_draining() for osd in ok_to_stop_osds]
+
+ # Check all osds for their state and take action (remove, purge etc)
+ new_queue: Set[OSD] = set()
+ for osd in self.all_osds(): # type: OSD
+ if not osd.force:
+ # skip criteria
+ if not osd.is_empty:
+ logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
+ new_queue.add(osd)
+ continue
+
+ if not osd.safe_to_destroy():
+ logger.info(
+ f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
+ new_queue.add(osd)
+ continue
+
+ # abort criteria
+ if not osd.down():
+ # also remove it from the remove_osd list and set a health_check warning?
+ raise orchestrator.OrchestratorError(
+ f"Could not set OSD <{osd.osd_id}> to 'down'")
+
+ if osd.replace:
+ if not osd.destroy():
+ raise orchestrator.OrchestratorError(
+ f"Could not destroy OSD <{osd.osd_id}>")
+ else:
+ if not osd.purge():
+ raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
+
+ if not osd.exists:
+ continue
+ assert osd.fullname is not None
+ assert osd.hostname is not None
+ self.mgr._remove_daemon(osd.fullname, osd.hostname)
+ logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.hostname}")
+ logger.debug(f"Removing {osd.osd_id} from the queue.")
+
+ # self could change while this is processing (osds get added from the CLI)
+ # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
+ # osds that were added while this method was executed'
+ self.intersection_update(new_queue)
+ self.save_to_store()
+
+ def cleanup(self) -> None:
+ # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
+ for osd in self.not_in_cluster():
+ self.remove(osd)
+
+ def save_to_store(self) -> None:
+ osd_queue = [osd.to_json() for osd in self.all_osds()]
+ logger.debug(f"Saving {osd_queue} to store")
+ self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue))
+
+ def load_from_store(self) -> None:
+ for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
+ for osd in json.loads(v):
+ logger.debug(f"Loading osd ->{osd} from store")
+ osd_obj = OSD.from_json(osd, rm_util=self.mgr.rm_util)
+ if osd_obj is not None:
+ self.osds.add(osd_obj)
+
def as_osd_ids(self) -> List[int]:
return [osd.osd_id for osd in self.osds]
}
])
cephadm_module.set_store('osd_remove_queue', data)
- cephadm_module.rm_util.load_from_store()
+ cephadm_module.to_remove_osds.load_from_store()
- expected = OSDRemovalQueue()
+ expected = OSDRemovalQueue(cephadm_module)
expected.add(OSD(osd_id=35, remove_util=rm_util, draining=True))
assert cephadm_module.to_remove_osds == expected
class TestOSDRemovalQueue:
def test_queue_size(self, osd_obj):
- q = OSDRemovalQueue()
+ q = OSDRemovalQueue(mock.Mock())
assert q.queue_size() == 0
q.add(osd_obj)
assert q.queue_size() == 1
@mock.patch("cephadm.services.osd.OSD.start")
@mock.patch("cephadm.services.osd.OSD.exists")
def test_enqueue(self, exist, start, osd_obj):
- q = OSDRemovalQueue()
+ q = OSDRemovalQueue(mock.Mock())
q.enqueue(osd_obj)
osd_obj.start.assert_called_once()
@mock.patch("cephadm.services.osd.OSD.stop")
@mock.patch("cephadm.services.osd.OSD.exists")
def test_rm_raise(self, exist, stop, osd_obj):
- q = OSDRemovalQueue()
+ q = OSDRemovalQueue(mock.Mock())
with pytest.raises(KeyError):
q.rm(osd_obj)
osd_obj.stop.assert_called_once()
@mock.patch("cephadm.services.osd.OSD.stop")
@mock.patch("cephadm.services.osd.OSD.exists")
def test_rm(self, exist, stop, osd_obj):
- q = OSDRemovalQueue()
+ q = OSDRemovalQueue(mock.Mock())
q.add(osd_obj)
q.rm(osd_obj)
osd_obj.stop.assert_called_once()