return AsyncCompletion(value=val, name='trivial_result')
+def with_services(service_type=None,
+ service_name=None,
+ service_id=None,
+ node_name=None,
+ refresh=False):
+ def decorator(func):
+ @wraps(func)
+ def wrapper(self, *args, **kwargs):
+ def on_complete(services):
+ if kwargs:
+ kwargs['services'] = services
+ return func(self, *args, **kwargs)
+ else:
+ args_ = args + (services,)
+ return func(self, *args_, **kwargs)
+ return self._get_services(service_type=service_type,
+ service_name=service_name,
+ service_id=service_id,
+ node_name=node_name,
+ refresh=refresh).then(on_complete)
+ return wrapper
+ return decorator
+
class CephadmOrchestrator(MgrModule, orchestrator.Orchestrator):
_STORE_HOST_PREFIX = "host"
return self.get_hosts().then(lambda hosts: self._create_osd(hosts, drive_group))
- def remove_osds(self, name):
- def _search(daemons):
- args = [('osd.%s' % d.service_instance, d.nodename) for d in daemons]
- if not args:
- raise OrchestratorError('Unable to find osd.%s' % name)
- return self._remove_daemon(args)
- return self._get_services('osd', service_id=name).then(_search)
+ @with_services('osd')
+ def remove_osds(self, osd_ids, services):
+ # type: (List[str], List[orchestrator.ServiceDescription]) -> AsyncCompletion
+ args = [(d.name(), d.nodename) for d in services if
+ d.service_instance in osd_ids]
+
+ found = list(zip(*args))[0] if args else []
+ not_found = {osd_id for osd_id in osd_ids if 'osd.%s' % osd_id not in found}
+ if not_found:
+ raise OrchestratorError('Unable to find ODS: %s' % not_found)
+ return self._remove_daemon(args)
def _create_daemon(self, daemon_type, daemon_id, host, keyring,
extra_args=[]):
c = cephadm_module.create_osds(dg)
assert self._wait(cephadm_module, c) == "Created osd(s) on host 'test'"
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
+ json.dumps([
+ dict(
+ name='osd.0',
+ style='cephadm',
+ fsid='fsid',
+ container_id='container_id',
+ version='version',
+ state='running',
+ )
+ ])
+ ))
+ def test_remove_osds(self, cephadm_module):
+ cephadm_module._cluster_fsid = "fsid"
+ with self._with_host(cephadm_module, 'test'):
+ c = cephadm_module.remove_osds(['0'])
+ out = self._wait(cephadm_module, c)
+ assert out == ["Removed osd.0 from host 'test'"]
+
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.CephadmOrchestrator.send_command")
@mock.patch("cephadm.module.CephadmOrchestrator.mon_command", mon_command)
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.blink_device_light('ident', True, [('test', '')])
assert self._wait(cephadm_module, c) == ['Set ident light for test: on']
+