From ea507d6c86f6a6ed5850edf24116e12088b24036 Mon Sep 17 00:00:00 2001 From: Cory Snyder Date: Fri, 10 Sep 2021 06:59:35 -0400 Subject: [PATCH] mgr/orchestrator: add --zap flag to 'orch osd rm' Adds the ability to zap OSD devices after removal, implemented as a flag on the 'orch osd rm' command. Fixes: https://tracker.ceph.com/issues/43692 Signed-off-by: Cory Snyder --- .../orch/cephadm/osds/2-ops/rm-zap-flag.yaml | 15 ++++++ src/pybind/mgr/cephadm/inventory.py | 7 +++ src/pybind/mgr/cephadm/module.py | 11 +++- src/pybind/mgr/cephadm/services/osd.py | 54 ++++++++++++++++--- .../mgr/cephadm/tests/test_osd_removal.py | 29 ++++++++++ src/pybind/mgr/orchestrator/_interface.py | 4 +- src/pybind/mgr/orchestrator/module.py | 5 +- src/pybind/mgr/rook/module.py | 4 +- 8 files changed, 118 insertions(+), 11 deletions(-) create mode 100644 qa/suites/orch/cephadm/osds/2-ops/rm-zap-flag.yaml diff --git a/qa/suites/orch/cephadm/osds/2-ops/rm-zap-flag.yaml b/qa/suites/orch/cephadm/osds/2-ops/rm-zap-flag.yaml new file mode 100644 index 00000000000..8f07f6d5374 --- /dev/null +++ b/qa/suites/orch/cephadm/osds/2-ops/rm-zap-flag.yaml @@ -0,0 +1,15 @@ +tasks: +- cephadm.shell: + host.a: + - | + set -e + set -x + ceph orch ps + ceph orch device ls + DEVID=$(ceph device ls | grep osd.1 | awk '{print $1}') + HOST=$(ceph orch device ls | grep "$DEVID" | awk '{print $1}') + DEV=$(ceph orch device ls | grep "$DEVID" | awk '{print $2}') + echo "host $HOST, dev $DEV, devid $DEVID" + ceph orch osd rm --zap --replace 1 + while ceph orch osd rm status | grep ^1 ; do sleep 5 ; done + while ! ceph osd dump | grep osd.1 | grep "up\s*in" ; do sleep 5 ; done diff --git a/src/pybind/mgr/cephadm/inventory.py b/src/pybind/mgr/cephadm/inventory.py index 67ccc612ff7..06f2494c2aa 100644 --- a/src/pybind/mgr/cephadm/inventory.py +++ b/src/pybind/mgr/cephadm/inventory.py @@ -828,6 +828,13 @@ class HostCache(): return dd raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)') + def has_daemon(self, daemon_name: str) -> bool: + try: + self.get_daemon(daemon_name) + except orchestrator.OrchestratorError: + return False + return True + def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]: def alter(host: str, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription: dd = copy(dd_orig) diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 711ddc69b54..5b82e625a29 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -361,6 +361,12 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, default=4721, desc='First port agent will try to bind to (will also try up to next 1000 subsequent ports if blocked)' ), + Option( + 'max_osd_draining_count', + type='int', + default=10, + desc='max number of osds that will be drained simultaneously when osds are removed' + ), ] def __init__(self, *args: Any, **kwargs: Any): @@ -426,6 +432,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self.endpoint_port = 0 self.agent_starting_port = 0 self.apply_spec_fails: List[Tuple[str, str]] = [] + self.max_osd_draining_count = 10 self.notify('mon_map', None) self.config_notify() @@ -2591,7 +2598,8 @@ Then run the following: @handle_orch_error def remove_osds(self, osd_ids: List[str], replace: bool = False, - force: bool = False) -> str: + force: bool = False, + zap: bool = False) -> str: """ Takes a list of OSDs and schedules them for removal. The function that takes care of the actual removal is @@ -2612,6 +2620,7 @@ Then run the following: self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id), replace=replace, force=force, + zap=zap, hostname=daemon.hostname, process_started_at=datetime_now(), remove_util=self.to_remove_osds.rm_util)) diff --git a/src/pybind/mgr/cephadm/services/osd.py b/src/pybind/mgr/cephadm/services/osd.py index f5996eeae76..94cb6fa076d 100644 --- a/src/pybind/mgr/cephadm/services/osd.py +++ b/src/pybind/mgr/cephadm/services/osd.py @@ -467,6 +467,19 @@ class RemoveUtil(object): self.mgr.log.info(f"{osd} weight is now {weight}") return True + def zap_osd(self, osd: "OSD") -> str: + "Zaps all devices that are associated with an OSD" + if osd.hostname is not None: + out, err, code = CephadmServe(self.mgr)._run_cephadm( + osd.hostname, 'osd', 'ceph-volume', + ['--', 'lvm', 'zap', '--destroy', '--osd-id', str(osd.osd_id)], + error_ok=True) + self.mgr.cache.invalidate_host_devices(osd.hostname) + if code: + raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err)) + return '\n'.join(out + err) + raise OrchestratorError(f"Failed to zap OSD {osd.osd_id} because host was unknown") + def safe_to_destroy(self, osd_ids: List[int]) -> bool: """ Queries the safe-to-destroy flag for OSDs """ cmd_args = {'prefix': 'osd safe-to-destroy', @@ -521,7 +534,7 @@ class OSD: replace: bool = False, force: bool = False, hostname: Optional[str] = None, - ): + zap: bool = False): # the ID of the OSD self.osd_id = osd_id @@ -558,6 +571,9 @@ class OSD: self.original_weight: Optional[float] = None + # Whether devices associated with the OSD should be zapped (DATA ERASED) + self.zap = zap + def start(self) -> None: if self.started: logger.debug(f"Already started draining {self}") @@ -628,6 +644,9 @@ class OSD: def destroy(self) -> bool: return self.rm_util.destroy_osd(self.osd_id) + def do_zap(self) -> str: + return self.rm_util.zap_osd(self) + def purge(self) -> bool: return self.rm_util.purge_osd(self.osd_id) @@ -656,6 +675,7 @@ class OSD: out['stopped'] = self.stopped out['replace'] = self.replace out['force'] = self.force + out['zap'] = self.zap out['hostname'] = self.hostname # type: ignore for k in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']: @@ -713,10 +733,10 @@ class OSDRemovalQueue(object): self.cleanup() # find osds that are ok-to-stop and not yet draining - ok_to_stop_osds = self.rm_util.find_osd_stop_threshold(self.idling_osds()) - if ok_to_stop_osds: + ready_to_drain_osds = self._ready_to_drain_osds() + if ready_to_drain_osds: # start draining those - _ = [osd.start_draining() for osd in ok_to_stop_osds] + _ = [osd.start_draining() for osd in ready_to_drain_osds] all_osds = self.all_osds() @@ -748,8 +768,12 @@ class OSDRemovalQueue(object): # stop and remove daemon assert osd.hostname is not None - CephadmServe(self.mgr)._remove_daemon(f'osd.{osd.osd_id}', osd.hostname) - logger.info(f"Successfully removed {osd} on {osd.hostname}") + + if self.mgr.cache.has_daemon(f'osd.{osd.osd_id}'): + CephadmServe(self.mgr)._remove_daemon(f'osd.{osd.osd_id}', osd.hostname) + logger.info(f"Successfully removed {osd} on {osd.hostname}") + else: + logger.info(f"Daemon {osd} on {osd.hostname} was already removed") if osd.replace: # mark destroyed in osdmap @@ -764,6 +788,12 @@ class OSDRemovalQueue(object): raise orchestrator.OrchestratorError(f"Could not purge {osd}") logger.info(f"Successfully purged {osd} on {osd.hostname}") + if osd.zap: + # throws an exception if the zap fails + logger.info(f"Zapping devices for {osd} on {osd.hostname}") + osd.do_zap() + logger.info(f"Successfully zapped devices for {osd} on {osd.hostname}") + logger.debug(f"Removing {osd} from the queue.") # self could change while this is processing (osds get added from the CLI) @@ -779,6 +809,18 @@ class OSDRemovalQueue(object): for osd in self._not_in_cluster(): self.osds.remove(osd) + def _ready_to_drain_osds(self) -> List["OSD"]: + """ + Returns OSDs that are ok to stop and not yet draining. Only returns as many OSDs as can + be accomodated by the 'max_osd_draining_count' config value, considering the number of OSDs + that are already draining. + """ + draining_limit = max(1, self.mgr.max_osd_draining_count) + num_already_draining = len(self.draining_osds()) + num_to_start_draining = max(0, draining_limit - num_already_draining) + stoppable_osds = self.rm_util.find_osd_stop_threshold(self.idling_osds()) + return [] if stoppable_osds is None else stoppable_osds[:num_to_start_draining] + def _save_to_store(self) -> None: osd_queue = [osd.to_json() for osd in self.osds] logger.debug(f"Saving {osd_queue} to store") diff --git a/src/pybind/mgr/cephadm/tests/test_osd_removal.py b/src/pybind/mgr/cephadm/tests/test_osd_removal.py index cffdcaf03fb..9347678b84d 100644 --- a/src/pybind/mgr/cephadm/tests/test_osd_removal.py +++ b/src/pybind/mgr/cephadm/tests/test_osd_removal.py @@ -3,6 +3,7 @@ import json from cephadm.services.osd import OSDRemovalQueue, OSD import pytest from tests import mock +from .fixtures import with_cephadm_module from datetime import datetime @@ -54,6 +55,33 @@ class TestOSDRemoval: # rm_util.process_removal_queue() pass + @pytest.mark.parametrize( + "max_osd_draining_count, draining_osds, idling_osds, ok_to_stop, expected", + [ + # drain one at a time, one already draining + (1, [1], [1], [True], 0), + # drain one at a time, none draining yet + (1, [], [1, 2, 3], [True, True, True], 1), + # drain one at a time, one already draining, none ok-to-stop + (1, [1], [1], [False], 0), + # drain one at a time, none draining, one ok-to-stop + (1, [], [1, 2, 3], [False, False, True], 1), + # drain three at a time, one already draining, all ok-to-stop + (3, [1], [1, 2, 3], [True, True, True], 2), + # drain two at a time, none already draining, none ok-to-stop + (2, [], [1, 2, 3], [False, False, False], 0), + # drain two at a time, none already draining, none idling + (2, [], [], [], 0), + ] + ) + def test_ready_to_drain_osds(self, max_osd_draining_count, draining_osds, idling_osds, ok_to_stop, expected): + with with_cephadm_module({'max_osd_draining_count': max_osd_draining_count}) as m: + with mock.patch("cephadm.services.osd.OSDRemovalQueue.draining_osds", return_value=draining_osds): + with mock.patch("cephadm.services.osd.OSDRemovalQueue.idling_osds", return_value=idling_osds): + with mock.patch("cephadm.services.osd.RemoveUtil.ok_to_stop", side_effect=ok_to_stop): + removal_queue = OSDRemovalQueue(m) + assert len(removal_queue._ready_to_drain_osds()) == expected + def test_ok_to_stop(self, rm_util): rm_util.ok_to_stop([MockOSD(1)]) rm_util._run_mon_cmd.assert_called_with({'prefix': 'osd ok-to-stop', 'ids': ['1']}) @@ -81,6 +109,7 @@ class TestOSDRemoval: "stopped": False, "replace": False, "force": False, + "zap": False, "nodename": "node2", "drain_started_at": "2020-09-14T11:41:53.960463", "drain_stopped_at": None, diff --git a/src/pybind/mgr/orchestrator/_interface.py b/src/pybind/mgr/orchestrator/_interface.py index 1497f80aaa6..fdb99413641 100644 --- a/src/pybind/mgr/orchestrator/_interface.py +++ b/src/pybind/mgr/orchestrator/_interface.py @@ -553,11 +553,13 @@ class Orchestrator(object): def remove_osds(self, osd_ids: List[str], replace: bool = False, - force: bool = False) -> OrchResult[str]: + force: bool = False, + zap: bool = False) -> OrchResult[str]: """ :param osd_ids: list of OSD IDs :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace` :param force: Forces the OSD removal process without waiting for the data to be drained first. + :param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA) Note that this can only remove OSDs that were successfully created (i.e. got an OSD ID). """ diff --git a/src/pybind/mgr/orchestrator/module.py b/src/pybind/mgr/orchestrator/module.py index 1b81a736e63..6d28d59fc7c 100644 --- a/src/pybind/mgr/orchestrator/module.py +++ b/src/pybind/mgr/orchestrator/module.py @@ -819,9 +819,10 @@ Usage: def _osd_rm_start(self, osd_id: List[str], replace: bool = False, - force: bool = False) -> HandleCommandResult: + force: bool = False, + zap: bool = False) -> HandleCommandResult: """Remove OSD daemons""" - completion = self.remove_osds(osd_id, replace=replace, force=force) + completion = self.remove_osds(osd_id, replace=replace, force=force, zap=zap) raise_if_exception(completion) return HandleCommandResult(stdout=completion.result_str()) diff --git a/src/pybind/mgr/rook/module.py b/src/pybind/mgr/rook/module.py index 4fedaec8ae4..958277851e6 100644 --- a/src/pybind/mgr/rook/module.py +++ b/src/pybind/mgr/rook/module.py @@ -509,8 +509,10 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): result_list.append(self.rook_cluster.add_osds(drive_group, matching_hosts)) return OrchResult(result_list) - def remove_osds(self, osd_ids: List[str], replace: bool = False, force: bool = False) -> OrchResult[str]: + def remove_osds(self, osd_ids: List[str], replace: bool = False, force: bool = False, zap: bool = False) -> OrchResult[str]: assert self._rook_cluster is not None + if zap: + raise RuntimeError("Rook does not support zapping devices during OSD removal.") res = self._rook_cluster.remove_osds(osd_ids, replace, force, self.mon_command) return OrchResult(res) -- 2.39.5