HostSpec, OrchestratorError
from tests import mock
from .fixtures import wait, _run_cephadm, match_glob, with_host, \
- with_cephadm_module, with_service, assert_rm_service, _deploy_cephadm_binary
+ with_cephadm_module, with_service, _deploy_cephadm_binary
from cephadm.module import CephadmOrchestrator
"""
with with_host(cephadm_module, 'test'):
c = cephadm_module.list_daemons(refresh=True)
assert wait(cephadm_module, c) == []
-
- with with_daemon(cephadm_module, ServiceSpec('mds', 'name'), CephadmOrchestrator.add_mds, 'test'):
+ with with_service(cephadm_module, ServiceSpec('mds', 'name', unmanaged=True)) as _, \
+ with_daemon(cephadm_module, ServiceSpec('mds', 'name'), CephadmOrchestrator.add_mds, 'test') as _:
c = cephadm_module.list_daemons()
out = [dict(o.to_json()) for o in wait(cephadm_module, c)]
expected = [
{
- 'placement': {'hosts': ['test']},
+ 'placement': {'count': 2},
'service_id': 'name',
'service_name': 'mds.name',
'service_type': 'mds',
- 'status': {'running': 1, 'size': 0},
+ 'status': {'created': mock.ANY, 'running': 1, 'size': 2},
'unmanaged': True
},
{
def test_daemon_action(self, cephadm_module: CephadmOrchestrator):
cephadm_module.service_cache_timeout = 10
with with_host(cephadm_module, 'test'):
- with with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), CephadmOrchestrator.add_rgw, 'test') as daemon_id:
+ with with_service(cephadm_module, RGWSpec(service_id='myrgw.foobar', unmanaged=True)) as _, \
+ with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), CephadmOrchestrator.add_rgw, 'test') as daemon_id:
c = cephadm_module.daemon_action('redeploy', 'rgw.' + daemon_id)
assert wait(cephadm_module,
def test_daemon_action_fail(self, cephadm_module: CephadmOrchestrator):
cephadm_module.service_cache_timeout = 10
with with_host(cephadm_module, 'test'):
- with with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), CephadmOrchestrator.add_rgw, 'test') as daemon_id:
+ with with_service(cephadm_module, RGWSpec(service_id='myrgw.foobar', unmanaged=True)) as _, \
+ with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), CephadmOrchestrator.add_rgw, 'test') as daemon_id:
with mock.patch('ceph_module.BaseMgrModule._ceph_send_command') as _ceph_send_command:
_ceph_send_command.side_effect = Exception("myerror")
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
def test_mon_add(self, cephadm_module):
with with_host(cephadm_module, 'test'):
- ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
- c = cephadm_module.add_mon(ServiceSpec('mon', placement=ps))
- assert wait(cephadm_module, c) == ["Deployed mon.a on host 'test'"]
-
- with pytest.raises(OrchestratorError, match="Must set public_network config option or specify a CIDR network,"):
- ps = PlacementSpec(hosts=['test'], count=1)
+ with with_service(cephadm_module, ServiceSpec(service_type='mon', unmanaged=True)):
+ ps = PlacementSpec(hosts=['test:0.0.0.0=a'], count=1)
c = cephadm_module.add_mon(ServiceSpec('mon', placement=ps))
- wait(cephadm_module, c)
+ assert wait(cephadm_module, c) == ["Deployed mon.a on host 'test'"]
+
+ with pytest.raises(OrchestratorError, match="Must set public_network config option or specify a CIDR network,"):
+ ps = PlacementSpec(hosts=['test'], count=1)
+ c = cephadm_module.add_mon(ServiceSpec('mon', placement=ps))
+ wait(cephadm_module, c)
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
def test_mgr_update(self, cephadm_module):
def test_rgw_update(self, cephadm_module):
with with_host(cephadm_module, 'host1'):
with with_host(cephadm_module, 'host2'):
- ps = PlacementSpec(hosts=['host1'], count=1)
- c = cephadm_module.add_rgw(
- RGWSpec(rgw_realm='realm', rgw_zone='zone1', placement=ps))
- [out] = wait(cephadm_module, c)
- match_glob(out, "Deployed rgw.realm.zone1.host1.* on host 'host1'")
+ with with_service(cephadm_module, RGWSpec(rgw_realm='realm', rgw_zone='zone1', unmanaged=True)):
+ ps = PlacementSpec(hosts=['host1'], count=1)
+ c = cephadm_module.add_rgw(
+ RGWSpec(rgw_realm='realm', rgw_zone='zone1', placement=ps))
+ [out] = wait(cephadm_module, c)
+ match_glob(out, "Deployed rgw.realm.zone1.host1.* on host 'host1'")
- ps = PlacementSpec(hosts=['host1', 'host2'], count=2)
- r = CephadmServe(cephadm_module)._apply_service(
- RGWSpec(rgw_realm='realm', rgw_zone='zone1', placement=ps))
- assert r
+ ps = PlacementSpec(hosts=['host1', 'host2'], count=2)
+ r = CephadmServe(cephadm_module)._apply_service(
+ RGWSpec(rgw_realm='realm', rgw_zone='zone1', placement=ps))
+ assert r
- assert_rm_daemon(cephadm_module, 'rgw.realm.zone1', 'host1')
- assert_rm_daemon(cephadm_module, 'rgw.realm.zone1', 'host2')
+ assert_rm_daemon(cephadm_module, 'rgw.realm.zone1', 'host1')
+ assert_rm_daemon(cephadm_module, 'rgw.realm.zone1', 'host2')
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(
json.dumps([
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.services.cephadmservice.RgwService.create_realm_zonegroup_zone", lambda _, __, ___: None)
def test_daemon_add(self, spec: ServiceSpec, meth, cephadm_module):
+ unmanaged_spec = ServiceSpec.from_json(spec.to_json())
+ unmanaged_spec.unmanaged = True
with with_host(cephadm_module, 'test'):
- with with_daemon(cephadm_module, spec, meth, 'test'):
- pass
+ with with_service(cephadm_module, unmanaged_spec):
+ with with_daemon(cephadm_module, spec, meth, 'test'):
+ pass
@mock.patch("cephadm.serve.CephadmServe._run_cephadm")
def test_daemon_add_fail(self, _run_cephadm, cephadm_module):
with with_host(cephadm_module, 'test'):
spec = ServiceSpec(
service_type='mgr',
- placement=PlacementSpec(hosts=[HostPlacementSpec('test', '', 'x')], count=1)
+ placement=PlacementSpec(hosts=[HostPlacementSpec('test', '', 'x')], count=1),
+ unmanaged=True
)
- _run_cephadm.side_effect = OrchestratorError('fail')
- with pytest.raises(OrchestratorError):
- wait(cephadm_module, cephadm_module.add_mgr(spec))
- cephadm_module.assert_issued_mon_command({
- 'prefix': 'auth rm',
- 'entity': 'mgr.x',
- })
+ with with_service(cephadm_module, spec):
+ _run_cephadm.side_effect = OrchestratorError('fail')
+ with pytest.raises(OrchestratorError):
+ wait(cephadm_module, cephadm_module.add_mgr(spec))
+ cephadm_module.assert_issued_mon_command({
+ 'prefix': 'auth rm',
+ 'entity': 'mgr.x',
+ })
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock())
pool='pool',
namespace='namespace',
placement=ps)
- c = cephadm_module.add_nfs(spec)
- [out] = wait(cephadm_module, c)
- match_glob(out, "Deployed nfs.name.* on host 'test'")
-
- assert_rm_daemon(cephadm_module, 'nfs.name.test', 'test')
+ unmanaged_spec = ServiceSpec.from_json(spec.to_json())
+ unmanaged_spec.unmanaged = True
+ with with_service(cephadm_module, unmanaged_spec):
+ c = cephadm_module.add_nfs(spec)
+ [out] = wait(cephadm_module, c)
+ match_glob(out, "Deployed nfs.name.* on host 'test'")
- # Hack. We never created the service, but we now need to remove it.
- # this is in contrast to the other services, which don't create this service
- # automatically.
- assert_rm_service(cephadm_module, 'nfs.name')
+ assert_rm_daemon(cephadm_module, 'nfs.name.test', 'test')
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.CephadmOrchestrator.rados", mock.MagicMock())
api_user='user',
api_password='password',
placement=ps)
- c = cephadm_module.add_iscsi(spec)
- [out] = wait(cephadm_module, c)
- match_glob(out, "Deployed iscsi.name.* on host 'test'")
+ unmanaged_spec = ServiceSpec.from_json(spec.to_json())
+ unmanaged_spec.unmanaged = True
+ with with_service(cephadm_module, unmanaged_spec):
- assert_rm_daemon(cephadm_module, 'iscsi.name.test', 'test')
+ c = cephadm_module.add_iscsi(spec)
+ [out] = wait(cephadm_module, c)
+ match_glob(out, "Deployed iscsi.name.* on host 'test'")
- # Hack. We never created the service, but we now need to remove it.
- # this is in contrast to the other services, which don't create this service
- # automatically.
- assert_rm_service(cephadm_module, 'iscsi.name')
+ assert_rm_daemon(cephadm_module, 'iscsi.name.test', 'test')
@pytest.mark.parametrize(
"on_bool",