--- /dev/null
+tasks:
+- cephadm.shell:
+ host.a:
+ - |
+ set -e
+ set -x
+ ceph orch ps
+ ceph orch device ls
+ DEVID=$(ceph device ls | grep osd.1 | awk '{print $1}')
+ HOST=$(ceph orch device ls | grep "$DEVID" | awk '{print $1}')
+ DEV=$(ceph orch device ls | grep "$DEVID" | awk '{print $2}')
+ echo "host $HOST, dev $DEV, devid $DEVID"
+ ceph orch osd rm --zap --replace 1
+ while ceph orch osd rm status | grep ^1 ; do sleep 5 ; done
+ while ! ceph osd dump | grep osd.1 | grep "up\s*in" ; do sleep 5 ; done
return dd
raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)')
+ def has_daemon(self, daemon_name: str) -> bool:
+ try:
+ self.get_daemon(daemon_name)
+ except orchestrator.OrchestratorError:
+ return False
+ return True
+
def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
def alter(host: str, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
dd = copy(dd_orig)
default=4721,
desc='First port agent will try to bind to (will also try up to next 1000 subsequent ports if blocked)'
),
+ Option(
+ 'max_osd_draining_count',
+ type='int',
+ default=10,
+ desc='max number of osds that will be drained simultaneously when osds are removed'
+ ),
]
def __init__(self, *args: Any, **kwargs: Any):
self.endpoint_port = 0
self.agent_starting_port = 0
self.apply_spec_fails: List[Tuple[str, str]] = []
+ self.max_osd_draining_count = 10
self.notify('mon_map', None)
self.config_notify()
@handle_orch_error
def remove_osds(self, osd_ids: List[str],
replace: bool = False,
- force: bool = False) -> str:
+ force: bool = False,
+ zap: bool = False) -> str:
"""
Takes a list of OSDs and schedules them for removal.
The function that takes care of the actual removal is
self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id),
replace=replace,
force=force,
+ zap=zap,
hostname=daemon.hostname,
process_started_at=datetime_now(),
remove_util=self.to_remove_osds.rm_util))
self.mgr.log.info(f"{osd} weight is now {weight}")
return True
+ def zap_osd(self, osd: "OSD") -> str:
+ "Zaps all devices that are associated with an OSD"
+ if osd.hostname is not None:
+ out, err, code = CephadmServe(self.mgr)._run_cephadm(
+ osd.hostname, 'osd', 'ceph-volume',
+ ['--', 'lvm', 'zap', '--destroy', '--osd-id', str(osd.osd_id)],
+ error_ok=True)
+ self.mgr.cache.invalidate_host_devices(osd.hostname)
+ if code:
+ raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
+ return '\n'.join(out + err)
+ raise OrchestratorError(f"Failed to zap OSD {osd.osd_id} because host was unknown")
+
def safe_to_destroy(self, osd_ids: List[int]) -> bool:
""" Queries the safe-to-destroy flag for OSDs """
cmd_args = {'prefix': 'osd safe-to-destroy',
replace: bool = False,
force: bool = False,
hostname: Optional[str] = None,
- ):
+ zap: bool = False):
# the ID of the OSD
self.osd_id = osd_id
self.original_weight: Optional[float] = None
+ # Whether devices associated with the OSD should be zapped (DATA ERASED)
+ self.zap = zap
+
def start(self) -> None:
if self.started:
logger.debug(f"Already started draining {self}")
def destroy(self) -> bool:
return self.rm_util.destroy_osd(self.osd_id)
+ def do_zap(self) -> str:
+ return self.rm_util.zap_osd(self)
+
def purge(self) -> bool:
return self.rm_util.purge_osd(self.osd_id)
out['stopped'] = self.stopped
out['replace'] = self.replace
out['force'] = self.force
+ out['zap'] = self.zap
out['hostname'] = self.hostname # type: ignore
for k in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
self.cleanup()
# find osds that are ok-to-stop and not yet draining
- ok_to_stop_osds = self.rm_util.find_osd_stop_threshold(self.idling_osds())
- if ok_to_stop_osds:
+ ready_to_drain_osds = self._ready_to_drain_osds()
+ if ready_to_drain_osds:
# start draining those
- _ = [osd.start_draining() for osd in ok_to_stop_osds]
+ _ = [osd.start_draining() for osd in ready_to_drain_osds]
all_osds = self.all_osds()
# stop and remove daemon
assert osd.hostname is not None
- CephadmServe(self.mgr)._remove_daemon(f'osd.{osd.osd_id}', osd.hostname)
- logger.info(f"Successfully removed {osd} on {osd.hostname}")
+
+ if self.mgr.cache.has_daemon(f'osd.{osd.osd_id}'):
+ CephadmServe(self.mgr)._remove_daemon(f'osd.{osd.osd_id}', osd.hostname)
+ logger.info(f"Successfully removed {osd} on {osd.hostname}")
+ else:
+ logger.info(f"Daemon {osd} on {osd.hostname} was already removed")
if osd.replace:
# mark destroyed in osdmap
raise orchestrator.OrchestratorError(f"Could not purge {osd}")
logger.info(f"Successfully purged {osd} on {osd.hostname}")
+ if osd.zap:
+ # throws an exception if the zap fails
+ logger.info(f"Zapping devices for {osd} on {osd.hostname}")
+ osd.do_zap()
+ logger.info(f"Successfully zapped devices for {osd} on {osd.hostname}")
+
logger.debug(f"Removing {osd} from the queue.")
# self could change while this is processing (osds get added from the CLI)
for osd in self._not_in_cluster():
self.osds.remove(osd)
+ def _ready_to_drain_osds(self) -> List["OSD"]:
+ """
+ Returns OSDs that are ok to stop and not yet draining. Only returns as many OSDs as can
+ be accomodated by the 'max_osd_draining_count' config value, considering the number of OSDs
+ that are already draining.
+ """
+ draining_limit = max(1, self.mgr.max_osd_draining_count)
+ num_already_draining = len(self.draining_osds())
+ num_to_start_draining = max(0, draining_limit - num_already_draining)
+ stoppable_osds = self.rm_util.find_osd_stop_threshold(self.idling_osds())
+ return [] if stoppable_osds is None else stoppable_osds[:num_to_start_draining]
+
def _save_to_store(self) -> None:
osd_queue = [osd.to_json() for osd in self.osds]
logger.debug(f"Saving {osd_queue} to store")
from cephadm.services.osd import OSDRemovalQueue, OSD
import pytest
from tests import mock
+from .fixtures import with_cephadm_module
from datetime import datetime
# rm_util.process_removal_queue()
pass
+ @pytest.mark.parametrize(
+ "max_osd_draining_count, draining_osds, idling_osds, ok_to_stop, expected",
+ [
+ # drain one at a time, one already draining
+ (1, [1], [1], [True], 0),
+ # drain one at a time, none draining yet
+ (1, [], [1, 2, 3], [True, True, True], 1),
+ # drain one at a time, one already draining, none ok-to-stop
+ (1, [1], [1], [False], 0),
+ # drain one at a time, none draining, one ok-to-stop
+ (1, [], [1, 2, 3], [False, False, True], 1),
+ # drain three at a time, one already draining, all ok-to-stop
+ (3, [1], [1, 2, 3], [True, True, True], 2),
+ # drain two at a time, none already draining, none ok-to-stop
+ (2, [], [1, 2, 3], [False, False, False], 0),
+ # drain two at a time, none already draining, none idling
+ (2, [], [], [], 0),
+ ]
+ )
+ def test_ready_to_drain_osds(self, max_osd_draining_count, draining_osds, idling_osds, ok_to_stop, expected):
+ with with_cephadm_module({'max_osd_draining_count': max_osd_draining_count}) as m:
+ with mock.patch("cephadm.services.osd.OSDRemovalQueue.draining_osds", return_value=draining_osds):
+ with mock.patch("cephadm.services.osd.OSDRemovalQueue.idling_osds", return_value=idling_osds):
+ with mock.patch("cephadm.services.osd.RemoveUtil.ok_to_stop", side_effect=ok_to_stop):
+ removal_queue = OSDRemovalQueue(m)
+ assert len(removal_queue._ready_to_drain_osds()) == expected
+
def test_ok_to_stop(self, rm_util):
rm_util.ok_to_stop([MockOSD(1)])
rm_util._run_mon_cmd.assert_called_with({'prefix': 'osd ok-to-stop', 'ids': ['1']})
"stopped": False,
"replace": False,
"force": False,
+ "zap": False,
"nodename": "node2",
"drain_started_at": "2020-09-14T11:41:53.960463",
"drain_stopped_at": None,
def remove_osds(self, osd_ids: List[str],
replace: bool = False,
- force: bool = False) -> OrchResult[str]:
+ force: bool = False,
+ zap: bool = False) -> OrchResult[str]:
"""
:param osd_ids: list of OSD IDs
:param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
:param force: Forces the OSD removal process without waiting for the data to be drained first.
+ :param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA)
Note that this can only remove OSDs that were successfully
created (i.e. got an OSD ID).
"""
def _osd_rm_start(self,
osd_id: List[str],
replace: bool = False,
- force: bool = False) -> HandleCommandResult:
+ force: bool = False,
+ zap: bool = False) -> HandleCommandResult:
"""Remove OSD daemons"""
- completion = self.remove_osds(osd_id, replace=replace, force=force)
+ completion = self.remove_osds(osd_id, replace=replace, force=force, zap=zap)
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
result_list.append(self.rook_cluster.add_osds(drive_group, matching_hosts))
return OrchResult(result_list)
- def remove_osds(self, osd_ids: List[str], replace: bool = False, force: bool = False) -> OrchResult[str]:
+ def remove_osds(self, osd_ids: List[str], replace: bool = False, force: bool = False, zap: bool = False) -> OrchResult[str]:
assert self._rook_cluster is not None
+ if zap:
+ raise RuntimeError("Rook does not support zapping devices during OSD removal.")
res = self._rook_cluster.remove_osds(osd_ids, replace, force, self.mon_command)
return OrchResult(res)