# if no mds found, return empty Daemon Desc
return DaemonDescription()
+ def purge(self, service_name: str) -> None:
+ self.mgr.check_mon_command({
+ 'prefix': 'config rm',
+ 'who': service_name,
+ 'name': 'mds_join_fs',
+ })
+
class RgwService(CephService):
TYPE = 'rgw'
with with_service(cephadm_module, spec, meth, 'test'):
pass
+ @mock.patch("cephadm.serve.CephadmServe._deploy_cephadm_binary", _deploy_cephadm_binary('test'))
+ @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+ def test_mds_config_purge(self, cephadm_module: CephadmOrchestrator):
+ spec = ServiceSpec('mds', service_id='fsname')
+ with with_host(cephadm_module, 'test'):
+ with with_service(cephadm_module, spec, host='test'):
+ ret, out, err = cephadm_module.check_mon_command({
+ 'prefix': 'config get',
+ 'who': spec.service_name(),
+ 'key': 'mds_join_fs',
+ })
+ assert out == 'fsname'
+ ret, out, err = cephadm_module.check_mon_command({
+ 'prefix': 'config get',
+ 'who': spec.service_name(),
+ 'key': 'mds_join_fs',
+ })
+ assert not out
+
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.services.cephadmservice.CephadmService.ok_to_stop")
def test_daemon_ok_to_stop(self, ok_to_stop, cephadm_module: CephadmOrchestrator):
self.mock_store_set('config', f'{cmd["who"]}/{cmd["name"]}', cmd['value'])
return ''
+ def config_rm():
+ self.mock_store_set('config', f'{cmd["who"]}/{cmd["name"]}', None)
+ return ''
+
def config_dump():
r = []
for prefix, value in self.mock_store_preifx('config', '').items():
outb = config_set()
elif cmd['prefix'] == 'config dump':
outb = config_dump()
+ elif cmd['prefix'] == 'config rm':
+ outb = config_rm()
elif hasattr(self, '_mon_command_mock_' + cmd['prefix'].replace(' ', '_')):
a = getattr(self, '_mon_command_mock_' + cmd['prefix'].replace(' ', '_'))
outb = a(cmd)