From 3b75d5d347d5dd8e0ee9c77fb6a1d31e29fc94ae Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Fri, 8 Jan 2021 12:34:40 +0100 Subject: [PATCH] mgr/cephadm: move process_removal_queue into OSDRemovalQueue `process_removal_queue` belongs to OSDRemovalQueue instead of RemoveUtil Signed-off-by: Sebastian Wagner (cherry picked from commit af52ba47a16703aa1aadbd59d84e8876bdad2f17) --- src/pybind/mgr/cephadm/module.py | 6 +- src/pybind/mgr/cephadm/serve.py | 2 +- src/pybind/mgr/cephadm/services/osd.py | 171 +++++++++--------- src/pybind/mgr/cephadm/tests/test_cephadm.py | 4 +- .../mgr/cephadm/tests/test_osd_removal.py | 12 +- 5 files changed, 97 insertions(+), 98 deletions(-) diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 94367cb7bf589..d289e92735d78 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -354,8 +354,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): self.cache.load() self.rm_util = RemoveUtil(self) - self.to_remove_osds = OSDRemovalQueue() - self.rm_util.load_from_store() + self.to_remove_osds = OSDRemovalQueue(self) + self.to_remove_osds.load_from_store() self.spec_store = SpecStore(self) self.spec_store.load() @@ -503,7 +503,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): self.log.debug(f"Found empty osd. Starting removal process") # if the osd that is now empty is also part of the removal queue # start the process - self.rm_util.process_removal_queue() + self._kick_serve_loop() def pause(self) -> None: if not self.paused: diff --git a/src/pybind/mgr/cephadm/serve.py b/src/pybind/mgr/cephadm/serve.py index e1a90ee4314bf..7808dc5baded9 100644 --- a/src/pybind/mgr/cephadm/serve.py +++ b/src/pybind/mgr/cephadm/serve.py @@ -63,7 +63,7 @@ class CephadmServe: self._update_paused_health() if not self.mgr.paused: - self.mgr.rm_util.process_removal_queue() + self.mgr.to_remove_osds.process_removal_queue() self.mgr.migration.migrate() if self.mgr.migration.is_migration_ongoing(): diff --git a/src/pybind/mgr/cephadm/services/osd.py b/src/pybind/mgr/cephadm/services/osd.py index 15f3bd04cb768..8f6d7ff511c6b 100644 --- a/src/pybind/mgr/cephadm/services/osd.py +++ b/src/pybind/mgr/cephadm/services/osd.py @@ -304,76 +304,6 @@ class RemoveUtil(object): def __init__(self, mgr: "CephadmOrchestrator") -> None: self.mgr: "CephadmOrchestrator" = mgr - def process_removal_queue(self) -> None: - """ - Performs actions in the _serve() loop to remove an OSD - when criteria is met. - """ - - # make sure that we don't run on OSDs that are not in the cluster anymore. - self.cleanup() - - logger.debug( - f"{self.mgr.to_remove_osds.queue_size()} OSDs are scheduled " - f"for removal: {self.mgr.to_remove_osds.all_osds()}") - - # find osds that are ok-to-stop and not yet draining - ok_to_stop_osds = self.find_osd_stop_threshold(self.mgr.to_remove_osds.idling_osds()) - if ok_to_stop_osds: - # start draining those - _ = [osd.start_draining() for osd in ok_to_stop_osds] - - # Check all osds for their state and take action (remove, purge etc) - to_remove_osds = self.mgr.to_remove_osds.all_osds() - new_queue: Set[OSD] = set() - for osd in to_remove_osds: # type: OSD - if not osd.force: - # skip criteria - if not osd.is_empty: - logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more") - new_queue.add(osd) - continue - - if not osd.safe_to_destroy(): - logger.info( - f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more") - new_queue.add(osd) - continue - - # abort criteria - if not osd.down(): - # also remove it from the remove_osd list and set a health_check warning? - raise orchestrator.OrchestratorError( - f"Could not set OSD <{osd.osd_id}> to 'down'") - - if osd.replace: - if not osd.destroy(): - raise orchestrator.OrchestratorError( - f"Could not destroy OSD <{osd.osd_id}>") - else: - if not osd.purge(): - raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>") - - if not osd.exists: - continue - assert osd.fullname is not None - assert osd.hostname is not None - self.mgr._remove_daemon(osd.fullname, osd.hostname) - logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.hostname}") - logger.debug(f"Removing {osd.osd_id} from the queue.") - - # self.mgr.to_remove_osds could change while this is processing (osds get added from the CLI) - # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and - # osds that were added while this method was executed' - self.mgr.to_remove_osds.intersection_update(new_queue) - self.save_to_store() - - def cleanup(self) -> None: - # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs - not_in_cluster_osds = self.mgr.to_remove_osds.not_in_cluster() - for osd in not_in_cluster_osds: - self.mgr.to_remove_osds.remove(osd) - def get_osds_in_cluster(self) -> List[str]: osd_map = self.mgr.get_osdmap() return [str(x.get('osd')) for x in osd_map.dump().get('osds', [])] @@ -481,19 +411,6 @@ class RemoveUtil(object): self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}") return True - def save_to_store(self) -> None: - osd_queue = [osd.to_json() for osd in self.mgr.to_remove_osds.all_osds()] - logger.debug(f"Saving {osd_queue} to store") - self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue)) - - def load_from_store(self) -> None: - for k, v in self.mgr.get_store_prefix('osd_remove_queue').items(): - for osd in json.loads(v): - logger.debug(f"Loading osd ->{osd} from store") - osd_obj = OSD.from_json(osd, ctx=self) - if osd_obj is not None: - self.mgr.to_remove_osds.add(osd_obj) - class NotFoundError(Exception): pass @@ -652,13 +569,13 @@ class OSD: return out @classmethod - def from_json(cls, inp: Optional[Dict[str, Any]], ctx: Optional[RemoveUtil] = None) -> Optional["OSD"]: + def from_json(cls, inp: Optional[Dict[str, Any]], rm_util: RemoveUtil) -> Optional["OSD"]: if not inp: return None for date_field in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']: if inp.get(date_field): inp.update({date_field: str_to_datetime(inp.get(date_field, ''))}) - inp.update({'remove_util': ctx}) + inp.update({'remove_util': rm_util}) if 'nodename' in inp: hostname = inp.pop('nodename') inp['hostname'] = hostname @@ -678,9 +595,91 @@ class OSD: class OSDRemovalQueue(object): - def __init__(self) -> None: + def __init__(self, mgr: "CephadmOrchestrator") -> None: + self.mgr: "CephadmOrchestrator" = mgr self.osds: Set[OSD] = set() + def process_removal_queue(self) -> None: + """ + Performs actions in the _serve() loop to remove an OSD + when criteria is met. + """ + + # make sure that we don't run on OSDs that are not in the cluster anymore. + self.cleanup() + + logger.debug( + f"{self.queue_size()} OSDs are scheduled " + f"for removal: {self.all_osds()}") + + # find osds that are ok-to-stop and not yet draining + ok_to_stop_osds = self.mgr.rm_util.find_osd_stop_threshold(self.idling_osds()) + if ok_to_stop_osds: + # start draining those + _ = [osd.start_draining() for osd in ok_to_stop_osds] + + # Check all osds for their state and take action (remove, purge etc) + new_queue: Set[OSD] = set() + for osd in self.all_osds(): # type: OSD + if not osd.force: + # skip criteria + if not osd.is_empty: + logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more") + new_queue.add(osd) + continue + + if not osd.safe_to_destroy(): + logger.info( + f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more") + new_queue.add(osd) + continue + + # abort criteria + if not osd.down(): + # also remove it from the remove_osd list and set a health_check warning? + raise orchestrator.OrchestratorError( + f"Could not set OSD <{osd.osd_id}> to 'down'") + + if osd.replace: + if not osd.destroy(): + raise orchestrator.OrchestratorError( + f"Could not destroy OSD <{osd.osd_id}>") + else: + if not osd.purge(): + raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>") + + if not osd.exists: + continue + assert osd.fullname is not None + assert osd.hostname is not None + self.mgr._remove_daemon(osd.fullname, osd.hostname) + logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.hostname}") + logger.debug(f"Removing {osd.osd_id} from the queue.") + + # self could change while this is processing (osds get added from the CLI) + # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and + # osds that were added while this method was executed' + self.intersection_update(new_queue) + self.save_to_store() + + def cleanup(self) -> None: + # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs + for osd in self.not_in_cluster(): + self.remove(osd) + + def save_to_store(self) -> None: + osd_queue = [osd.to_json() for osd in self.all_osds()] + logger.debug(f"Saving {osd_queue} to store") + self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue)) + + def load_from_store(self) -> None: + for k, v in self.mgr.get_store_prefix('osd_remove_queue').items(): + for osd in json.loads(v): + logger.debug(f"Loading osd ->{osd} from store") + osd_obj = OSD.from_json(osd, rm_util=self.mgr.rm_util) + if osd_obj is not None: + self.osds.add(osd_obj) + def as_osd_ids(self) -> List[int]: return [osd.osd_id for osd in self.osds] diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py index 2a26aa47c6dfa..0b1600c2ad0c1 100644 --- a/src/pybind/mgr/cephadm/tests/test_cephadm.py +++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py @@ -507,8 +507,8 @@ class TestCephadm(object): process_started_at=datetime.datetime.utcnow(), remove_util=cephadm_module.rm_util )) - cephadm_module.rm_util.process_removal_queue() - assert cephadm_module.to_remove_osds == OSDRemovalQueue() + cephadm_module.to_remove_osds.process_removal_queue() + assert cephadm_module.to_remove_osds == OSDRemovalQueue(cephadm_module) c = cephadm_module.remove_osds_status() out = wait(cephadm_module, c) diff --git a/src/pybind/mgr/cephadm/tests/test_osd_removal.py b/src/pybind/mgr/cephadm/tests/test_osd_removal.py index 76f067bb55a43..9fd5f9b760ea7 100644 --- a/src/pybind/mgr/cephadm/tests/test_osd_removal.py +++ b/src/pybind/mgr/cephadm/tests/test_osd_removal.py @@ -90,9 +90,9 @@ class TestOSDRemoval: } ]) cephadm_module.set_store('osd_remove_queue', data) - cephadm_module.rm_util.load_from_store() + cephadm_module.to_remove_osds.load_from_store() - expected = OSDRemovalQueue() + expected = OSDRemovalQueue(cephadm_module) expected.add(OSD(osd_id=35, remove_util=rm_util, draining=True)) assert cephadm_module.to_remove_osds == expected @@ -218,7 +218,7 @@ class TestOSD: class TestOSDRemovalQueue: def test_queue_size(self, osd_obj): - q = OSDRemovalQueue() + q = OSDRemovalQueue(mock.Mock()) assert q.queue_size() == 0 q.add(osd_obj) assert q.queue_size() == 1 @@ -226,14 +226,14 @@ class TestOSDRemovalQueue: @mock.patch("cephadm.services.osd.OSD.start") @mock.patch("cephadm.services.osd.OSD.exists") def test_enqueue(self, exist, start, osd_obj): - q = OSDRemovalQueue() + q = OSDRemovalQueue(mock.Mock()) q.enqueue(osd_obj) osd_obj.start.assert_called_once() @mock.patch("cephadm.services.osd.OSD.stop") @mock.patch("cephadm.services.osd.OSD.exists") def test_rm_raise(self, exist, stop, osd_obj): - q = OSDRemovalQueue() + q = OSDRemovalQueue(mock.Mock()) with pytest.raises(KeyError): q.rm(osd_obj) osd_obj.stop.assert_called_once() @@ -241,7 +241,7 @@ class TestOSDRemovalQueue: @mock.patch("cephadm.services.osd.OSD.stop") @mock.patch("cephadm.services.osd.OSD.exists") def test_rm(self, exist, stop, osd_obj): - q = OSDRemovalQueue() + q = OSDRemovalQueue(mock.Mock()) q.add(osd_obj) q.rm(osd_obj) osd_obj.stop.assert_called_once() -- 2.39.5