from orchestrator import ServiceDescription, DaemonDescription, InventoryHost, \
HostSpec, OrchestratorError
from tests import mock
-from .fixtures import cephadm_module, wait, _run_cephadm, mon_command, match_glob
+from .fixtures import cephadm_module, wait, _run_cephadm, mon_command, match_glob, with_host
from cephadm.module import CephadmOrchestrator
class TestCephadm(object):
- @contextmanager
- def _with_host(self, m, name):
- # type: (CephadmOrchestrator, str) -> None
- wait(m, m.add_host(HostSpec(hostname=name)))
- yield
- wait(m, m.remove_host(name))
-
def test_get_unique_name(self, cephadm_module):
# type: (CephadmOrchestrator) -> None
existing = [
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
def test_host(self, cephadm_module):
assert wait(cephadm_module, cephadm_module.get_hosts()) == []
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
assert wait(cephadm_module, cephadm_module.get_hosts()) == [HostSpec('test', 'test')]
# Be careful with backward compatibility when changing things here:
assert json.loads(cephadm_module._store['inventory']) == \
{"test": {"hostname": "test", "addr": "test", "labels": [], "status": ""}}
- with self._with_host(cephadm_module, 'second'):
+ with with_host(cephadm_module, 'second'):
assert wait(cephadm_module, cephadm_module.get_hosts()) == [
HostSpec('test', 'test'),
HostSpec('second', 'second')
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
def test_service_ls(self, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
c = cephadm_module.list_daemons(refresh=True)
assert wait(cephadm_module, c) == []
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
def test_device_ls(self, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
c = cephadm_module.get_inventory()
assert wait(cephadm_module, c) == [InventoryHost('test')]
))
def test_daemon_action(self, cephadm_module):
cephadm_module.service_cache_timeout = 10
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
c = cephadm_module.list_daemons(refresh=True)
wait(cephadm_module, c)
c = cephadm_module.daemon_action('redeploy', 'rgw', 'myrgw.foobar')
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
def test_mon_add(self, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
c = cephadm_module.add_mon(ServiceSpec('mon', placement=ps))
assert wait(cephadm_module, c) == ["Deployed mon.a on host 'test'"]
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
def test_mgr_update(self, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
r = cephadm_module._apply_service(ServiceSpec('mgr', placement=ps))
assert r
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm")
def test_apply_osd_save(self, _run_cephadm, cephadm_module: CephadmOrchestrator):
_run_cephadm.return_value = ('{}', '', 0)
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
spec = DriveGroupSpec(
service_id='foo',
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.SpecStore.save")
def test_apply_osd_save_placement(self, _save_spec, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
json_spec = {'service_type': 'osd', 'placement': {'host_pattern': 'test'}, 'service_id': 'foo', 'data_devices': {'all': True}}
spec = ServiceSpec.from_json(json_spec)
assert isinstance(spec, DriveGroupSpec)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
def test_create_osds(self, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=['']))
c = cephadm_module.create_osds(dg)
out = wait(cephadm_module, c)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
def test_prepare_drivegroup(self, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
dg = DriveGroupSpec(placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=['']))
out = cephadm_module.osd_service.prepare_drivegroup(dg)
assert len(out) == 1
)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
def test_driveselection_to_ceph_volume(self, cephadm_module, devices, preview, exp_command):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
dg = DriveGroupSpec(service_id='test.spec', placement=PlacementSpec(host_pattern='test'), data_devices=DeviceSelection(paths=devices))
ds = DriveSelection(dg, Devices([Device(path) for path in devices]))
preview = preview
))
@mock.patch("cephadm.services.osd.RemoveUtil.get_pg_count", lambda _, __: 0)
def test_remove_osds(self, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
c = cephadm_module.list_daemons(refresh=True)
wait(cephadm_module, c)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
def test_rgw_update(self, cephadm_module):
- with self._with_host(cephadm_module, 'host1'):
- with self._with_host(cephadm_module, 'host2'):
+ with with_host(cephadm_module, 'host1'):
+ with with_host(cephadm_module, 'host2'):
ps = PlacementSpec(hosts=['host1'], count=1)
c = cephadm_module.add_rgw(RGWSpec(rgw_realm='realm', rgw_zone='zone1', placement=ps))
[out] = wait(cephadm_module, c)
])
))
def test_remove_daemon(self, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
c = cephadm_module.list_daemons(refresh=True)
wait(cephadm_module, c)
c = cephadm_module.remove_daemons(['rgw.myrgw.myhost.myid'])
)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
def test_daemon_add(self, spec: ServiceSpec, meth, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
spec.placement = PlacementSpec(hosts=['test'], count=1)
c = meth(cephadm_module, spec)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock())
def test_nfs(self, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test'], count=1)
spec = NFSServiceSpec(
service_id='name',
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock())
def test_iscsi(self, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
ps = PlacementSpec(hosts=['test'], count=1)
spec = IscsiServiceSpec(
service_id='name',
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
def test_blink_device_light(self, cephadm_module):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
c = cephadm_module.blink_device_light('ident', True, [('test', '', '')])
assert wait(cephadm_module, c) == ['Set ident light for test: on']
)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
def test_apply_save(self, spec: ServiceSpec, meth, cephadm_module: CephadmOrchestrator):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
if not spec.placement:
spec.placement = PlacementSpec(hosts=['test'], count=1)
c = meth(cephadm_module, spec)
def test_offline(self, _check, _get_connection, cephadm_module):
_check.return_value = '{}', '', 0
_get_connection.return_value = mock.Mock(), mock.Mock()
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
_get_connection.side_effect = HostNotFound
code, out, err = cephadm_module.check_host('test')
assert out == ''
return '{}', None, 0
with mock.patch("remoto.Connection", side_effect=[Connection(), Connection(), Connection()]):
with mock.patch("remoto.process.check", _check):
- with self._with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test'):
code, out, err = cephadm_module.check_host('test')
# First should succeed.
assert err is None
--- /dev/null
+from datetime import datetime
+
+from ceph.deployment.service_spec import PlacementSpec, ServiceSpec, HostPlacementSpec
+from cephadm import CephadmOrchestrator
+from cephadm.tests.fixtures import _run_cephadm, cephadm_module, wait, match_glob, with_host
+from orchestrator import ServiceDescription
+from tests import mock
+
+@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('[]'))
+def test_service_ls(cephadm_module: CephadmOrchestrator):
+ with with_host(cephadm_module, 'host1'):
+ with with_host(cephadm_module, 'host2'):
+
+ # emulate the old scheduler:
+ c = cephadm_module.apply_rgw(
+ ServiceSpec('rgw', 'r.z', placement=PlacementSpec(host_pattern='*', count=2))
+ )
+ assert wait(cephadm_module, c) == 'Scheduled rgw.r.z update...'
+
+ cephadm_module._apply_all_services()
+ out = {o.hostname for o in wait(cephadm_module, cephadm_module.list_daemons())}
+ assert out == {'host1', 'host2'}
+
+ c = cephadm_module.apply_rgw(
+ ServiceSpec('rgw', 'r.z', placement=PlacementSpec(host_pattern='host1', count=2))
+ )
+ assert wait(cephadm_module, c) == 'Scheduled rgw.r.z update...'
+
+ cephadm_module.migration_current = 0
+ cephadm_module.migration.migrate()
+
+ # assert we need all daemons.
+ assert cephadm_module.migration_current == 0
+
+ # Sorry, for this hack, but I need to make sure, Migration thinks,
+ # we have updated all daemons already.
+ cephadm_module.cache.last_daemon_update['host1'] = datetime.now()
+ cephadm_module.cache.last_daemon_update['host2'] = datetime.now()
+
+ cephadm_module.migration.migrate()
+ assert cephadm_module.migration_current == 1
+
+ out = [o.spec.placement for o in wait(cephadm_module, cephadm_module.describe_service())]
+ assert out == [PlacementSpec(count=2, hosts=[HostPlacementSpec(hostname='host1', network='', name=''), HostPlacementSpec(hostname='host2', network='', name='')])]
+
+
+
+# assert_rm_service(cephadm_module, 'rgw.r.z')
+# assert_rm_daemon(cephadm_module, 'mds.name', 'test')
\ No newline at end of file