HostSpec, OrchestratorError, DaemonDescriptionStatus, OrchestratorEvent
from tests import mock
from .fixtures import wait, _run_cephadm, match_glob, with_host, \
- with_cephadm_module, with_service, _deploy_cephadm_binary, make_daemons_running, async_side_effect
+ with_cephadm_module, with_service, make_daemons_running, async_side_effect
from cephadm.module import CephadmOrchestrator
"""
RGWSpec(service_id="foo"),
]
)
- @mock.patch("cephadm.serve.CephadmServe._deploy_cephadm_binary", _deploy_cephadm_binary('test'))
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
def test_daemon_add(self, spec: ServiceSpec, cephadm_module):
unmanaged_spec = ServiceSpec.from_json(spec.to_json())
), CephadmOrchestrator.apply_container),
]
)
- @mock.patch("cephadm.serve.CephadmServe._deploy_cephadm_binary", _deploy_cephadm_binary('test'))
@mock.patch("subprocess.run", None)
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.services.nfs.NFSService.run_grace_tool", mock.MagicMock())
with with_service(cephadm_module, spec, meth, 'test'):
pass
- @mock.patch("cephadm.serve.CephadmServe._deploy_cephadm_binary", _deploy_cephadm_binary('test'))
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
def test_mds_config_purge(self, cephadm_module: CephadmOrchestrator):
spec = ServiceSpec('mds', service_id='fsname')