import logging
import time
-from typing import List, NamedTuple, Dict, Any, Set
+from typing import List, NamedTuple, Dict, Any, Set, Union
import orchestrator
from orchestrator import OrchestratorError
def __init__(self, mgr):
self.mgr = mgr
self.to_remove_osds: Set[OSDRemoval] = set()
- self.osd_removal_report: dict = dict()
+ self.osd_removal_report: Dict[OSDRemoval, Union[int,str]] = dict()
def _remove_osds_bg(self) -> None:
raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
completion = self.mgr._remove_daemon([(osd.fullname, osd.nodename, True)])
- completion.add_progress('Removing OSDs', self)
+ completion.add_progress('Removing OSDs', self.mgr)
completion.update_progress = True
if completion:
while not completion.has_result:
logger.debug(f"Removing {osd.osd_id} from the queue.")
self.to_remove_osds.remove(osd)
- def _generate_osd_removal_status(self) -> Dict[Any, object]:
+ def _generate_osd_removal_status(self) -> Dict[OSDRemoval, Union[int,str]]:
"""
Generate a OSD report that can be printed to the CLI
"""
logger.debug("Assembling report for osd rm status")
- report = {}
+ report: Dict[OSDRemoval, Union[int,str]] = {}
for osd in self.to_remove_osds:
pg_count = self.get_pg_count(str(osd.osd_id))
report[osd] = pg_count if pg_count != -1 else 'n/a'
+import datetime
import json
from contextlib import contextmanager
import pytest
from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
+from cephadm.osd import OSDRemoval
try:
from typing import Any
)
])
))
+ @mock.patch("cephadm.osd.RemoveUtil.get_pg_count", lambda _, __: 0)
def test_remove_osds(self, cephadm_module):
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.list_daemons(refresh=True)
wait(cephadm_module, c)
+
c = cephadm_module.remove_daemons(['osd.0'], False)
out = wait(cephadm_module, c)
assert out == ["Removed osd.0 from host 'test'"]
+ osd_removal_op = OSDRemoval(0, False, False, 'test', 'osd.0', datetime.datetime.utcnow())
+ cephadm_module.rm_util.to_remove_osds.add(osd_removal_op)
+ cephadm_module.rm_util._remove_osds_bg()
+ assert cephadm_module.rm_util.to_remove_osds == set()
+
+ c = cephadm_module.remove_osds_status()
+ out = wait(cephadm_module, c)
+ assert out == {osd_removal_op: 0}
+
+
+
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
def test_mds(self, cephadm_module):
with self._with_host(cephadm_module, 'test'):