]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/orchestrator: add --zap flag to 'orch osd rm' 43260/head
authorCory Snyder <csnyder@iland.com>
Fri, 10 Sep 2021 10:59:35 +0000 (06:59 -0400)
committerCory Snyder <csnyder@iland.com>
Tue, 26 Oct 2021 13:48:32 +0000 (09:48 -0400)
Adds the ability to zap OSD devices after removal, implemented as a flag
on the 'orch osd rm' command.

Fixes: https://tracker.ceph.com/issues/43692
Signed-off-by: Cory Snyder <csnyder@iland.com>
qa/suites/orch/cephadm/osds/2-ops/rm-zap-flag.yaml [new file with mode: 0644]
src/pybind/mgr/cephadm/inventory.py
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/services/osd.py
src/pybind/mgr/cephadm/tests/test_osd_removal.py
src/pybind/mgr/orchestrator/_interface.py
src/pybind/mgr/orchestrator/module.py
src/pybind/mgr/rook/module.py

diff --git a/qa/suites/orch/cephadm/osds/2-ops/rm-zap-flag.yaml b/qa/suites/orch/cephadm/osds/2-ops/rm-zap-flag.yaml
new file mode 100644 (file)
index 0000000..8f07f6d
--- /dev/null
@@ -0,0 +1,15 @@
+tasks:
+- cephadm.shell:
+    host.a:
+      - |
+        set -e
+        set -x
+        ceph orch ps
+        ceph orch device ls
+        DEVID=$(ceph device ls | grep osd.1 | awk '{print $1}')
+        HOST=$(ceph orch device ls | grep "$DEVID" | awk '{print $1}')
+        DEV=$(ceph orch device ls | grep "$DEVID" | awk '{print $2}')
+        echo "host $HOST, dev $DEV, devid $DEVID"
+        ceph orch osd rm --zap --replace 1
+        while ceph orch osd rm status | grep ^1 ; do sleep 5 ; done
+        while ! ceph osd dump | grep osd.1 | grep "up\s*in" ; do sleep 5 ; done
index 67ccc612ff78488cabe9e47b5dae9770e5cfdd9d..06f2494c2aade2c239c8812849d12dbc8b2683fd 100644 (file)
@@ -828,6 +828,13 @@ class HostCache():
                     return dd
         raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)')
 
+    def has_daemon(self, daemon_name: str) -> bool:
+        try:
+            self.get_daemon(daemon_name)
+        except orchestrator.OrchestratorError:
+            return False
+        return True
+
     def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
         def alter(host: str, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
             dd = copy(dd_orig)
index 711ddc69b54af1a8d4506b73f64a5d6767ff779e..5b82e625a2955429eb695062f8fe4a060ddbd70f 100644 (file)
@@ -361,6 +361,12 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             default=4721,
             desc='First port agent will try to bind to (will also try up to next 1000 subsequent ports if blocked)'
         ),
+        Option(
+            'max_osd_draining_count',
+            type='int',
+            default=10,
+            desc='max number of osds that will be drained simultaneously when osds are removed'
+        ),
     ]
 
     def __init__(self, *args: Any, **kwargs: Any):
@@ -426,6 +432,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
             self.endpoint_port = 0
             self.agent_starting_port = 0
             self.apply_spec_fails: List[Tuple[str, str]] = []
+            self.max_osd_draining_count = 10
 
         self.notify('mon_map', None)
         self.config_notify()
@@ -2591,7 +2598,8 @@ Then run the following:
     @handle_orch_error
     def remove_osds(self, osd_ids: List[str],
                     replace: bool = False,
-                    force: bool = False) -> str:
+                    force: bool = False,
+                    zap: bool = False) -> str:
         """
         Takes a list of OSDs and schedules them for removal.
         The function that takes care of the actual removal is
@@ -2612,6 +2620,7 @@ Then run the following:
                 self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id),
                                                 replace=replace,
                                                 force=force,
+                                                zap=zap,
                                                 hostname=daemon.hostname,
                                                 process_started_at=datetime_now(),
                                                 remove_util=self.to_remove_osds.rm_util))
index f5996eeae764b4faf5b8ccf99347fe93ff3c681c..94cb6fa076de7a44f9a92edd19a987a145c94878 100644 (file)
@@ -467,6 +467,19 @@ class RemoveUtil(object):
         self.mgr.log.info(f"{osd} weight is now {weight}")
         return True
 
+    def zap_osd(self, osd: "OSD") -> str:
+        "Zaps all devices that are associated with an OSD"
+        if osd.hostname is not None:
+            out, err, code = CephadmServe(self.mgr)._run_cephadm(
+                osd.hostname, 'osd', 'ceph-volume',
+                ['--', 'lvm', 'zap', '--destroy', '--osd-id', str(osd.osd_id)],
+                error_ok=True)
+            self.mgr.cache.invalidate_host_devices(osd.hostname)
+            if code:
+                raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
+            return '\n'.join(out + err)
+        raise OrchestratorError(f"Failed to zap OSD {osd.osd_id} because host was unknown")
+
     def safe_to_destroy(self, osd_ids: List[int]) -> bool:
         """ Queries the safe-to-destroy flag for OSDs """
         cmd_args = {'prefix': 'osd safe-to-destroy',
@@ -521,7 +534,7 @@ class OSD:
                  replace: bool = False,
                  force: bool = False,
                  hostname: Optional[str] = None,
-                 ):
+                 zap: bool = False):
         # the ID of the OSD
         self.osd_id = osd_id
 
@@ -558,6 +571,9 @@ class OSD:
 
         self.original_weight: Optional[float] = None
 
+        # Whether devices associated with the OSD should be zapped (DATA ERASED)
+        self.zap = zap
+
     def start(self) -> None:
         if self.started:
             logger.debug(f"Already started draining {self}")
@@ -628,6 +644,9 @@ class OSD:
     def destroy(self) -> bool:
         return self.rm_util.destroy_osd(self.osd_id)
 
+    def do_zap(self) -> str:
+        return self.rm_util.zap_osd(self)
+
     def purge(self) -> bool:
         return self.rm_util.purge_osd(self.osd_id)
 
@@ -656,6 +675,7 @@ class OSD:
         out['stopped'] = self.stopped
         out['replace'] = self.replace
         out['force'] = self.force
+        out['zap'] = self.zap
         out['hostname'] = self.hostname  # type: ignore
 
         for k in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
@@ -713,10 +733,10 @@ class OSDRemovalQueue(object):
         self.cleanup()
 
         # find osds that are ok-to-stop and not yet draining
-        ok_to_stop_osds = self.rm_util.find_osd_stop_threshold(self.idling_osds())
-        if ok_to_stop_osds:
+        ready_to_drain_osds = self._ready_to_drain_osds()
+        if ready_to_drain_osds:
             # start draining those
-            _ = [osd.start_draining() for osd in ok_to_stop_osds]
+            _ = [osd.start_draining() for osd in ready_to_drain_osds]
 
         all_osds = self.all_osds()
 
@@ -748,8 +768,12 @@ class OSDRemovalQueue(object):
 
             # stop and remove daemon
             assert osd.hostname is not None
-            CephadmServe(self.mgr)._remove_daemon(f'osd.{osd.osd_id}', osd.hostname)
-            logger.info(f"Successfully removed {osd} on {osd.hostname}")
+
+            if self.mgr.cache.has_daemon(f'osd.{osd.osd_id}'):
+                CephadmServe(self.mgr)._remove_daemon(f'osd.{osd.osd_id}', osd.hostname)
+                logger.info(f"Successfully removed {osd} on {osd.hostname}")
+            else:
+                logger.info(f"Daemon {osd} on {osd.hostname} was already removed")
 
             if osd.replace:
                 # mark destroyed in osdmap
@@ -764,6 +788,12 @@ class OSDRemovalQueue(object):
                     raise orchestrator.OrchestratorError(f"Could not purge {osd}")
                 logger.info(f"Successfully purged {osd} on {osd.hostname}")
 
+            if osd.zap:
+                # throws an exception if the zap fails
+                logger.info(f"Zapping devices for {osd} on {osd.hostname}")
+                osd.do_zap()
+                logger.info(f"Successfully zapped devices for {osd} on {osd.hostname}")
+
             logger.debug(f"Removing {osd} from the queue.")
 
         # self could change while this is processing (osds get added from the CLI)
@@ -779,6 +809,18 @@ class OSDRemovalQueue(object):
             for osd in self._not_in_cluster():
                 self.osds.remove(osd)
 
+    def _ready_to_drain_osds(self) -> List["OSD"]:
+        """
+        Returns OSDs that are ok to stop and not yet draining. Only returns as many OSDs as can
+        be accomodated by the 'max_osd_draining_count' config value, considering the number of OSDs
+        that are already draining.
+        """
+        draining_limit = max(1, self.mgr.max_osd_draining_count)
+        num_already_draining = len(self.draining_osds())
+        num_to_start_draining = max(0, draining_limit - num_already_draining)
+        stoppable_osds = self.rm_util.find_osd_stop_threshold(self.idling_osds())
+        return [] if stoppable_osds is None else stoppable_osds[:num_to_start_draining]
+
     def _save_to_store(self) -> None:
         osd_queue = [osd.to_json() for osd in self.osds]
         logger.debug(f"Saving {osd_queue} to store")
index cffdcaf03fbcbed85095218b8deca730f9476505..9347678b84daa5a42ced6af71aa9b833ce87b8c0 100644 (file)
@@ -3,6 +3,7 @@ import json
 from cephadm.services.osd import OSDRemovalQueue, OSD
 import pytest
 from tests import mock
+from .fixtures import with_cephadm_module
 from datetime import datetime
 
 
@@ -54,6 +55,33 @@ class TestOSDRemoval:
         # rm_util.process_removal_queue()
         pass
 
+    @pytest.mark.parametrize(
+        "max_osd_draining_count, draining_osds, idling_osds, ok_to_stop, expected",
+        [
+            # drain one at a time, one already draining
+            (1, [1], [1], [True], 0),
+            # drain one at a time, none draining yet
+            (1, [], [1, 2, 3], [True, True, True], 1),
+            # drain one at a time, one already draining, none ok-to-stop
+            (1, [1], [1], [False], 0),
+            # drain one at a time, none draining, one ok-to-stop
+            (1, [], [1, 2, 3], [False, False, True], 1),
+            # drain three at a time, one already draining, all ok-to-stop
+            (3, [1], [1, 2, 3], [True, True, True], 2),
+            # drain two at a time, none already draining, none ok-to-stop
+            (2, [], [1, 2, 3], [False, False, False], 0),
+            # drain two at a time, none already draining, none idling
+            (2, [], [], [], 0),
+        ]
+    )
+    def test_ready_to_drain_osds(self, max_osd_draining_count, draining_osds, idling_osds, ok_to_stop, expected):
+        with with_cephadm_module({'max_osd_draining_count': max_osd_draining_count}) as m:
+            with mock.patch("cephadm.services.osd.OSDRemovalQueue.draining_osds", return_value=draining_osds):
+                with mock.patch("cephadm.services.osd.OSDRemovalQueue.idling_osds", return_value=idling_osds):
+                    with mock.patch("cephadm.services.osd.RemoveUtil.ok_to_stop", side_effect=ok_to_stop):
+                        removal_queue = OSDRemovalQueue(m)
+                        assert len(removal_queue._ready_to_drain_osds()) == expected
+
     def test_ok_to_stop(self, rm_util):
         rm_util.ok_to_stop([MockOSD(1)])
         rm_util._run_mon_cmd.assert_called_with({'prefix': 'osd ok-to-stop', 'ids': ['1']})
@@ -81,6 +109,7 @@ class TestOSDRemoval:
                 "stopped": False,
                 "replace": False,
                 "force": False,
+                "zap": False,
                 "nodename": "node2",
                 "drain_started_at": "2020-09-14T11:41:53.960463",
                 "drain_stopped_at": None,
index 1497f80aaa67e341d6a7b0a438b096fd2265d49a..fdb994136414fe276e59140b69c9452b47a732d8 100644 (file)
@@ -553,11 +553,13 @@ class Orchestrator(object):
 
     def remove_osds(self, osd_ids: List[str],
                     replace: bool = False,
-                    force: bool = False) -> OrchResult[str]:
+                    force: bool = False,
+                    zap: bool = False) -> OrchResult[str]:
         """
         :param osd_ids: list of OSD IDs
         :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
         :param force: Forces the OSD removal process without waiting for the data to be drained first.
+        :param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA)
         Note that this can only remove OSDs that were successfully
         created (i.e. got an OSD ID).
         """
index 1b81a736e63996bbda2d1ef0f5748d00a380d5c0..6d28d59fc7c82912622ca15bdf0e36180fd083b5 100644 (file)
@@ -819,9 +819,10 @@ Usage:
     def _osd_rm_start(self,
                       osd_id: List[str],
                       replace: bool = False,
-                      force: bool = False) -> HandleCommandResult:
+                      force: bool = False,
+                      zap: bool = False) -> HandleCommandResult:
         """Remove OSD daemons"""
-        completion = self.remove_osds(osd_id, replace=replace, force=force)
+        completion = self.remove_osds(osd_id, replace=replace, force=force, zap=zap)
         raise_if_exception(completion)
         return HandleCommandResult(stdout=completion.result_str())
 
index 4fedaec8ae409abcec126c48aea094be05c5b77c..958277851e6fe350ea8290b52cae6b674fe91bdf 100644 (file)
@@ -509,8 +509,10 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
             result_list.append(self.rook_cluster.add_osds(drive_group, matching_hosts))
         return OrchResult(result_list)
 
-    def remove_osds(self, osd_ids: List[str], replace: bool = False, force: bool = False) -> OrchResult[str]:
+    def remove_osds(self, osd_ids: List[str], replace: bool = False, force: bool = False, zap: bool = False) -> OrchResult[str]:
         assert self._rook_cluster is not None
+        if zap:
+            raise RuntimeError("Rook does not support zapping devices during OSD removal.")
         res = self._rook_cluster.remove_osds(osd_ids, replace, force, self.mon_command)
         return OrchResult(res)