def trivial_completion(f):
# type: (Callable) -> Callable[..., orchestrator.Completion]
- return ssh_completion(cls=orchestrator.Completion)(f)
-
-
-def trivial_result(val):
- # type: (Any) -> AsyncCompletion
- return AsyncCompletion(value=val, name='trivial_result')
+ @wraps(f)
+ def wrapper(*args, **kwargs):
+ return AsyncCompletion(value=f(*args, **kwargs), name=f.__name__)
+ return wrapper
@six.add_metaclass(CLICommandMeta)
# hmm!
return 0
+ @trivial_completion
def describe_service(self, service_type=None, service_name=None,
refresh=False):
if refresh:
size=self._get_spec_size(spec),
running=0,
)
- return trivial_result([s for n, s in sm.items()])
+ return [s for n, s in sm.items()]
+ @trivial_completion
def list_daemons(self, daemon_type=None, daemon_id=None,
host=None, refresh=False):
if refresh:
if daemon_id and daemon_id != dd.daemon_id:
continue
result.append(dd)
- return trivial_result(result)
+ return result
def service_action(self, action, service_name):
args = []
self.log.info('Remove daemons %s' % [a[0] for a in args])
return self._remove_daemons(args)
+ @trivial_completion
def remove_service(self, service_name):
self.log.info('Remove service %s' % service_name)
self.spec_store.rm(service_name)
self._kick_serve_loop()
- return trivial_result(['Removed service %s' % service_name])
+ return ['Removed service %s' % service_name]
+ @trivial_completion
def get_inventory(self, host_filter=None, refresh=False):
"""
Return the storage inventory of hosts matching the given filter.
continue
result.append(orchestrator.InventoryHost(host,
inventory.Devices(dls)))
- return trivial_result(result)
+ return result
+ @trivial_completion
def zap_device(self, host, path):
self.log.info('Zap device %s:%s' % (host, path))
out, err, code = self._run_cephadm(
self.cache.invalidate_host_devices(host)
if code:
raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
- return trivial_result('\n'.join(out + err))
+ return '\n'.join(out + err)
def blink_device_light(self, ident_fault, on, locs):
@async_map_completion
r[str(o['osd'])] = o['uuid']
return r
- def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> Sequence[orchestrator.Completion]:
- completions: List[orchestrator.Completion] = list()
- for spec in specs:
- completions.extend(self._apply(spec))
- return completions
+ @trivial_completion
+ def apply_drivegroups(self, specs: List[DriveGroupSpec]):
+ return [self._apply(spec) for spec in specs]
- def create_osds(self, drive_group):
- # type: (DriveGroupSpec) -> orchestrator.Completion
+ @trivial_completion
+ def create_osds(self, drive_group: DriveGroupSpec):
self.log.debug("Processing DriveGroup {}".format(drive_group))
# 1) use fn_filter to determine matching_hosts
matching_hosts = drive_group.placement.pattern_matches_hosts([x for x in self.cache.get_hosts()])
drive_group.service_name(), host))
ret_msg = self._create_osd(host, cmd)
ret.append(ret_msg)
- return trivial_result(", ".join(ret))
+ return ", ".join(ret)
def _create_osd(self, host, cmd):
create_func = create_fns.get(daemon_type, None)
if not create_func:
self.log.debug('unrecognized service type %s' % daemon_type)
- return trivial_result([])
+ return False
config_func = config_fns.get(daemon_type, None)
daemons = self.cache.get_daemons_by_service(service_name)
return create_func_map(args)
+ @trivial_completion
def apply_mon(self, spec):
return self._apply(spec)
# type: (ServiceSpec) -> orchestrator.Completion
return self._add_daemon('mgr', spec, self._create_mgr)
- def _apply(self, spec: ServiceSpec):
+ def _apply(self, spec: ServiceSpec) -> str:
if spec.placement.is_empty():
# fill in default placement
defaults = {
spec.service_name(), spec.placement.pretty_str()))
self.spec_store.save(spec)
self._kick_serve_loop()
- return trivial_result("Scheduled %s update..." % spec.service_type)
+ return "Scheduled %s update..." % spec.service_type
- def apply(self, specs: List[ServiceSpec]) -> AsyncCompletion:
- return self._apply(specs)
+ @trivial_completion
+ def apply(self, specs: List[ServiceSpec]):
+ return [self._apply(spec) for spec in specs]
+ @trivial_completion
def apply_mgr(self, spec):
return self._apply(spec)
- def add_mds(self, spec):
- # type: (ServiceSpec) -> AsyncCompletion
+ def add_mds(self, spec: ServiceSpec):
return self._add_daemon('mds', spec, self._create_mds, self._config_mds)
- def apply_mds(self, spec: ServiceSpec) -> orchestrator.Completion:
+ @trivial_completion
+ def apply_mds(self, spec: ServiceSpec):
return self._apply(spec)
def _config_mds(self, spec):
})
return self._create_daemon('rgw', rgw_id, host, keyring=keyring)
+ @trivial_completion
def apply_rgw(self, spec):
return self._apply(spec)
return self._create_daemon('rbd-mirror', daemon_id, host,
keyring=keyring)
+ @trivial_completion
def apply_rbd_mirror(self, spec):
return self._apply(spec)
def _create_prometheus(self, daemon_id, host):
return self._create_daemon('prometheus', daemon_id, host)
+ @trivial_completion
def apply_prometheus(self, spec):
return self._apply(spec)
return self._add_daemon('node-exporter', spec,
self._create_node_exporter)
+ @trivial_completion
def apply_node_exporter(self, spec):
return self._apply(spec)
return self._add_daemon('crash', spec,
self._create_crash)
+ @trivial_completion
def apply_crash(self, spec):
return self._apply(spec)
# type: (ServiceSpec) -> AsyncCompletion
return self._add_daemon('grafana', spec, self._create_grafana)
- def apply_grafana(self, spec):
- # type: (ServiceSpec) -> AsyncCompletion
+ @trivial_completion
+ def apply_grafana(self, spec: ServiceSpec):
return self._apply(spec)
def _create_grafana(self, daemon_id, host):
# type: (ServiceSpec) -> AsyncCompletion
return self._add_daemon('alertmanager', spec, self._create_alertmanager)
- def apply_alertmanager(self, spec):
- # type: (ServiceSpec) -> AsyncCompletion
+ @trivial_completion
+ def apply_alertmanager(self, spec: ServiceSpec):
return self._apply(spec)
def _create_alertmanager(self, daemon_id, host):
(image_name, image_id, ceph_version))
return image_id, ceph_version
+ @trivial_completion
def upgrade_check(self, image, version):
if version:
target_name = self.container_image_base + ':v' + version
}
return json.dumps(r, indent=4, sort_keys=True)
+ @trivial_completion
def upgrade_status(self):
r = orchestrator.UpgradeStatusSpec()
if self.upgrade_state:
r.message = 'Error: ' + self.upgrade_state.get('error')
elif self.upgrade_state.get('paused'):
r.message = 'Upgrade paused'
- return trivial_result(r)
+ return r
+ @trivial_completion
def upgrade_start(self, image, version):
if self.mode != 'root':
raise OrchestratorError('upgrade is not supported in %s mode' % (
if self.upgrade_state.get('paused'):
del self.upgrade_state['paused']
self._save_upgrade_state()
- return trivial_result('Resumed upgrade to %s' %
- self.upgrade_state.get('target_name'))
- return trivial_result('Upgrade to %s in progress' %
- self.upgrade_state.get('target_name'))
+ return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name')
+ return 'Upgrade to %s in progress' % self.upgrade_state.get('target_name')
self.upgrade_state = {
'target_name': target_name,
'progress_id': str(uuid.uuid4()),
self._save_upgrade_state()
self._clear_upgrade_health_checks()
self.event.set()
- return trivial_result('Initiating upgrade to %s' % (image))
+ return 'Initiating upgrade to %s' % (image)
+ @trivial_completion
def upgrade_pause(self):
if not self.upgrade_state:
raise OrchestratorError('No upgrade in progress')
if self.upgrade_state.get('paused'):
- return trivial_result('Upgrade to %s already paused' %
- self.upgrade_state.get('target_name'))
+ return 'Upgrade to %s already paused' % self.upgrade_state.get('target_name')
self.upgrade_state['paused'] = True
self._save_upgrade_state()
- return trivial_result('Paused upgrade to %s' %
- self.upgrade_state.get('target_name'))
+ return 'Paused upgrade to %s' % self.upgrade_state.get('target_name')
+ @trivial_completion
def upgrade_resume(self):
if not self.upgrade_state:
raise OrchestratorError('No upgrade in progress')
if not self.upgrade_state.get('paused'):
- return trivial_result('Upgrade to %s not paused' %
- self.upgrade_state.get('target_name'))
+ return 'Upgrade to %s not paused' % self.upgrade_state.get('target_name')
del self.upgrade_state['paused']
self._save_upgrade_state()
self.event.set()
- return trivial_result('Resumed upgrade to %s' %
- self.upgrade_state.get('target_name'))
+ return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name')
+ @trivial_completion
def upgrade_stop(self):
if not self.upgrade_state:
- return trivial_result('No upgrade in progress')
+ return 'No upgrade in progress'
target_name = self.upgrade_state.get('target_name')
if 'progress_id' in self.upgrade_state:
self.remote('progress', 'complete',
self._save_upgrade_state()
self._clear_upgrade_health_checks()
self.event.set()
- return trivial_result('Stopped upgrade to %s' % target_name)
+ return 'Stopped upgrade to %s' % target_name
+ @trivial_completion
def remove_osds(self, osd_ids: List[str],
replace: bool = False,
- force: bool = False) -> orchestrator.Completion:
+ force: bool = False):
"""
Takes a list of OSDs and schedules them for removal.
The function that takes care of the actual removal is
# trigger the serve loop to initiate the removal
self._kick_serve_loop()
- return trivial_result(f"Scheduled OSD(s) for removal")
+ return "Scheduled OSD(s) for removal"
- def remove_osds_status(self) -> orchestrator.Completion:
+ @trivial_completion
+ def remove_osds_status(self):
"""
The CLI call to retrieve an osd removal report
"""
- return trivial_result(self.rm_util.report)
+ return self.rm_util.report
- def list_specs(self, service_name=None) -> orchestrator.Completion:
+ @trivial_completion
+ def list_specs(self, service_name=None):
"""
Loads all entries from the service_spec mon_store root.
"""
- return trivial_result(self.spec_store.find(service_name=service_name))
+ return self.spec_store.find(service_name=service_name)
class BaseScheduler(object):
ps = PlacementSpec(hosts=['test'], count=1)
spec = ServiceSpec('mgr', placement=ps)
c = cephadm_module.apply_mgr(spec)
- _save_spec.assert_called_with(spec)
assert wait(cephadm_module, c) == 'Scheduled mgr update...'
+ _save_spec.assert_called_with(spec)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.SpecStore.save")
ps = PlacementSpec(hosts=['test'], count=1)
spec = ServiceSpec('mds', 'fsname', placement=ps)
c = cephadm_module.apply_mds(spec)
- _save_spec.assert_called_with(spec)
assert wait(cephadm_module, c) == 'Scheduled mds update...'
+ _save_spec.assert_called_with(spec)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.SpecStore.save")
ps = PlacementSpec(hosts=['test'], count=1)
spec = ServiceSpec('rgw', 'r.z', placement=ps)
c = cephadm_module.apply_rgw(spec)
- _save_spec.assert_called_with(spec)
assert wait(cephadm_module, c) == 'Scheduled rgw update...'
+ _save_spec.assert_called_with(spec)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.SpecStore.save")
ps = PlacementSpec(hosts=['test'], count=1)
spec = ServiceSpec('rbd-mirror', placement=ps)
c = cephadm_module.apply_rbd_mirror(spec)
- _save_spec.assert_called_with(spec)
assert wait(cephadm_module, c) == 'Scheduled rbd-mirror update...'
+ _save_spec.assert_called_with(spec)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.SpecStore.save")
ps = PlacementSpec(hosts=['test'], count=1)
spec = ServiceSpec('prometheus', placement=ps)
c = cephadm_module.apply_prometheus(spec)
- _save_spec.assert_called_with(spec)
assert wait(cephadm_module, c) == 'Scheduled prometheus update...'
+ _save_spec.assert_called_with(spec)
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
@mock.patch("cephadm.module.SpecStore.save")
ps = PlacementSpec(hosts=['test'], count=1)
spec = ServiceSpec('node_exporter', placement=ps)
c = cephadm_module.apply_node_exporter(spec)
- _save_spec.assert_called_with(spec)
assert wait(cephadm_module, c) == 'Scheduled node_exporter update...'
+ _save_spec.assert_called_with(spec)