from .services.iscsi import IscsiService
from .services.ha_rgw import HA_RGWService
from .services.nfs import NFSService
-from .services.osd import RemoveUtil, OSDQueue, OSDService, OSD, NotFoundError
+from .services.osd import RemoveUtil, OSDRemovalQueue, OSDService, OSD, NotFoundError
from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
NodeExporterService
from .schedule import HostAssignment
self.cache.load()
self.rm_util = RemoveUtil(self)
- self.to_remove_osds = OSDQueue()
+ self.to_remove_osds = OSDRemovalQueue()
self.rm_util.load_from_store()
self.spec_store = SpecStore(self)
return f"<OSD>(osd_id={self.osd_id}, draining={self.draining})"
-class OSDQueue(Set):
+class OSDRemovalQueue(Set):
def __init__(self) -> None:
super().__init__()
from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
from cephadm.serve import CephadmServe
-from cephadm.services.osd import OSD, OSDQueue
+from cephadm.services.osd import OSD, OSDRemovalQueue
try:
from typing import Any, List
remove_util=cephadm_module.rm_util
))
cephadm_module.rm_util.process_removal_queue()
- assert cephadm_module.to_remove_osds == OSDQueue()
+ assert cephadm_module.to_remove_osds == OSDRemovalQueue()
c = cephadm_module.remove_osds_status()
out = wait(cephadm_module, c)
import json
-from cephadm.services.osd import RemoveUtil, OSDQueue, OSD
+from cephadm.services.osd import RemoveUtil, OSDRemovalQueue, OSD
import pytest
from .fixtures import rm_util, osd_obj, cephadm_module
from tests import mock
cephadm_module.set_store('osd_remove_queue', data)
cephadm_module.rm_util.load_from_store()
- assert repr(cephadm_module.to_remove_osds) == 'OSDQueue({<OSD>(osd_id=35, draining=True)})'
+ assert repr(
+ cephadm_module.to_remove_osds) == 'OSDRemovalQueue({<OSD>(osd_id=35, draining=True)})'
class TestOSD:
assert osd_obj.drain_status_human() == 'done, waiting for purge'
-class TestOSDQueue:
+class TestOSDRemovalQueue:
def test_queue_size(self, osd_obj):
- q = OSDQueue()
+ q = OSDRemovalQueue()
assert q.queue_size() == 0
q.add(osd_obj)
assert q.queue_size() == 1
@mock.patch("cephadm.services.osd.OSD.start")
@mock.patch("cephadm.services.osd.OSD.exists")
def test_enqueue(self, exist, start, osd_obj):
- q = OSDQueue()
+ q = OSDRemovalQueue()
q.enqueue(osd_obj)
osd_obj.start.assert_called_once()
@mock.patch("cephadm.services.osd.OSD.stop")
@mock.patch("cephadm.services.osd.OSD.exists")
def test_rm_raise(self, exist, stop, osd_obj):
- q = OSDQueue()
+ q = OSDRemovalQueue()
with pytest.raises(KeyError):
q.rm(osd_obj)
osd_obj.stop.assert_called_once()
@mock.patch("cephadm.services.osd.OSD.stop")
@mock.patch("cephadm.services.osd.OSD.exists")
def test_rm(self, exist, stop, osd_obj):
- q = OSDQueue()
+ q = OSDRemovalQueue()
q.add(osd_obj)
q.rm(osd_obj)
osd_obj.stop.assert_called_once()
RbdMirrorService, CrashService, CephadmService, AuthEntity, CephadmExporter
from cephadm.services.iscsi import IscsiService
from cephadm.services.nfs import NFSService
-from cephadm.services.osd import RemoveUtil, OSDQueue, OSDService, OSD, NotFoundError
+from cephadm.services.osd import RemoveUtil, OSDRemovalQueue, OSDService, OSD, NotFoundError
from cephadm.services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
NodeExporterService