return AsyncCompletion(value=val, name='trivial_result')
-def with_daemons(daemon_type=None,
- daemon_id=None,
- service_name=None,
- host=None,
- refresh=False):
- def decorator(func):
- @wraps(func)
- def wrapper(self, *args, **kwargs):
- def on_complete(daemons):
- if kwargs:
- kwargs['daemons'] = daemons
- return func(self, *args, **kwargs)
- else:
- args_ = args + (daemons,)
- return func(self, *args_, **kwargs)
- return self._get_daemons(daemon_type=daemon_type,
- daemon_id=daemon_id,
- service_name=service_name,
- host=host,
- refresh=refresh).then(on_complete)
- return wrapper
- return decorator
-
@six.add_metaclass(CLICommandMeta)
class CephadmOrchestrator(MgrModule, orchestrator.OrchestratorClientMixin):
self.health_checks[alert_id] = alert
self.set_health_checks(self.health_checks)
- def _do_upgrade(self, daemons):
- # type: (List[orchestrator.DaemonDescription]) -> Optional[AsyncCompletion]
+ def _do_upgrade(self):
+ # type: () -> Optional[AsyncCompletion]
if not self.upgrade_state:
self.log.debug('_do_upgrade no state, exiting')
return None
if opt['name'] == 'container_image':
image_settings[opt['section']] = opt['value']
+ daemons = []
+ for host, di in self.daemon_cache.items():
+ for name, dd in di['daemons'].items():
+ daemons.append(dd)
+
for daemon_type in ['mgr', 'mon', 'osd', 'rgw', 'mds']:
self.log.info('Upgrade: Checking %s daemons...' % daemon_type)
need_upgrade_self = False
self._check_for_strays()
if self.upgrade_state and not self.upgrade_state.get('paused'):
- completion = self._do_upgrade(daemons)
+ completion = self._do_upgrade()
if completion:
while not completion.has_result:
self.process([completion])
self._save_inventory()
return 'Removed label %s from host %s' % (label, host)
+ def _get_daemons_by_type(self, daemon_type):
+ # type: (str) -> List[orchestrator.DaemonDescription]
+ result = [] # type: List[orchestrator.DaemonDescription]
+ for host, di in self.daemon_cache.items():
+ for name, d in di['daemons'].items():
+ if name.startswith(daemon_type + '.'):
+ result.append(d)
+ return result
+
def _refresh_host_daemons(self, host):
out, err, code = self._run_cephadm(
host, 'mon', 'ls', [], no_fsid=True)
return "Removed {} from host '{}'".format(name, host)
def _update_service(self, daemon_type, add_func, spec):
- def ___update_service(daemons):
- if len(daemons) > spec.count:
- # remove some
- to_remove = len(daemons) - spec.count
- args = []
- for d in daemons[0:to_remove]:
- args.append(
- ('%s.%s' % (d.daemon_type, d.daemon_id), d.nodename)
- )
- return self._remove_daemon(args)
- elif len(daemons) < spec.count:
- return add_func(spec)
- return []
- return self._get_daemons(daemon_type, service_name=spec.name).then(___update_service)
+ daemons = self._get_daemons_by_type(daemon_type)
+ if len(daemons) > spec.count:
+ # remove some
+ to_remove = len(daemons) - spec.count
+ args = []
+ for d in daemons[0:to_remove]:
+ args.append(
+ ('%s.%s' % (d.daemon_type, d.daemon_id), d.nodename)
+ )
+ return self._remove_daemon(args)
+ elif len(daemons) < spec.count:
+ return add_func(spec)
+ return trivial_result([])
def _add_new_daemon(self,
daemon_type: str,
- daemons: List[orchestrator.DaemonDescription],
spec: orchestrator.ServiceSpec,
create_func: Callable):
+ daemons = self._get_daemons_by_type(daemon_type)
args = []
num_added = 0
assert spec.count is not None
if not network:
raise RuntimeError("Host '{}' is missing a network spec".format(host))
- def add_mons(daemons):
- for _, _, name in spec.placement.hosts:
- if name and len([d for d in daemons if d.daemon_id == name]):
- raise RuntimeError('name %s already exists', name)
-
- # explicit placement: enough hosts provided?
- if len(spec.placement.hosts) < spec.count:
- raise RuntimeError("Error: {} hosts provided, expected {}".format(
- len(spec.placement.hosts), spec.count))
- self.log.info("creating {} monitors on hosts: '{}'".format(
- spec.count, ",".join(map(lambda h: ":".join(h), spec.placement.hosts))))
- # TODO: we may want to chain the creation of the monitors so they join
- # the quorum one at a time.
- return self._create_mon(spec.placement.hosts)
-
- return self._get_daemons('mon').then(add_mons)
+ daemons = self._get_daemons_by_type('mon')
+ for _, _, name in spec.placement.hosts:
+ if name and len([d for d in daemons if d.daemon_id == name]):
+ raise RuntimeError('name %s already exists', name)
+
+ # explicit placement: enough hosts provided?
+ if len(spec.placement.hosts) < spec.count:
+ raise RuntimeError("Error: {} hosts provided, expected {}".format(
+ len(spec.placement.hosts), spec.count))
+ self.log.info("creating {} monitors on hosts: '{}'".format(
+ spec.count, ",".join(map(lambda h: ":".join(h), spec.placement.hosts))))
+ # TODO: we may want to chain the creation of the monitors so they join
+ # the quorum one at a time.
+ return self._create_mon(spec.placement.hosts)
def apply_mon(self, spec):
# type: (orchestrator.ServiceSpec) -> orchestrator.Completion
if not network:
raise RuntimeError("Host '{}' is missing a network spec".format(host))
- def update_mons_with_daemons(daemons):
- for _, _, name in spec.placement.hosts:
- if name and len([d for d in daemons if d.daemon_id == name]):
- raise RuntimeError('name %s alrady exists', name)
-
- # explicit placement: enough hosts provided?
- num_new_mons = spec.count - num_mons
- if len(spec.placement.hosts) < num_new_mons:
- raise RuntimeError("Error: {} hosts provided, expected {}".format(
- len(spec.placement.hosts), num_new_mons))
- self.log.info("creating {} monitors on hosts: '{}'".format(
- num_new_mons, ",".join(map(lambda h: ":".join(h), spec.placement.hosts))))
- # TODO: we may want to chain the creation of the monitors so they join
- # the quorum one at a time.
- return self._create_mon(spec.placement.hosts)
- return self._get_daemons('mon').then(update_mons_with_daemons)
+ daemons = self._get_daemons_by_type('mon')
+ for _, _, name in spec.placement.hosts:
+ if name and len([d for d in daemons if d.daemon_id == name]):
+ raise RuntimeError('name %s alrady exists', name)
+
+ # explicit placement: enough hosts provided?
+ num_new_mons = spec.count - num_mons
+ if len(spec.placement.hosts) < num_new_mons:
+ raise RuntimeError("Error: {} hosts provided, expected {}".format(
+ len(spec.placement.hosts), num_new_mons))
+ self.log.info("creating {} monitors on hosts: '{}'".format(
+ num_new_mons, ",".join(map(lambda h: ":".join(h), spec.placement.hosts))))
+ # TODO: we may want to chain the creation of the monitors so they join
+ # the quorum one at a time.
+ return self._create_mon(spec.placement.hosts)
@async_map_completion
def _create_mgr(self, mgr_id, host):
return self._create_daemon('mgr', mgr_id, host, keyring=keyring)
- @with_daemons('mgr')
- def add_mgr(self, spec, daemons):
- # type: (orchestrator.ServiceSpec, List[orchestrator.DaemonDescription]) -> orchestrator.Completion
+ def add_mgr(self, spec):
+ # type: (orchestrator.ServiceSpec) -> orchestrator.Completion
spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='mgr').load()
- return self._add_new_daemon('mgr', daemons, spec, self._create_mgr)
+ return self._add_new_daemon('mgr', spec, self._create_mgr)
- @with_daemons('mgr')
- def apply_mgr(self, spec, daemons):
- # type: (orchestrator.ServiceSpec, List[orchestrator.DaemonDescription]) -> orchestrator.Completion
+ def apply_mgr(self, spec):
+ # type: (orchestrator.ServiceSpec) -> orchestrator.Completion
"""
Adjust the number of cluster managers.
"""
spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='mgr').load()
+ daemons = self._get_daemons_by_type('mgr')
num_mgrs = len(daemons)
if spec.count == num_mgrs:
return orchestrator.Completion(value="The requested number of managers exist.")
'value': spec.name,
})
- def _add_mds(daemons):
- # type: (List[orchestrator.DaemonDescription]) -> AsyncCompletion
- return self._add_new_daemon('mds', daemons, spec, self._create_mds)
-
- return self._get_daemons('mds').then(_add_mds)
+ return self._add_new_daemon('mds', spec, self._create_mds)
def apply_mds(self, spec):
# type: (orchestrator.ServiceSpec) -> AsyncCompletion
'value': spec.rgw_realm,
})
- def _add_rgw(daemons):
- return self._add_new_daemon('rgw', daemons, spec, self._create_rgw)
-
- return self._get_daemons('rgw').then(_add_rgw)
+ return self._add_new_daemon('rgw', spec, self._create_rgw)
@async_map_completion
def _create_rgw(self, rgw_id, host):
raise RuntimeError("must specify at least %d hosts" % spec.count)
self.log.debug('nodes %s' % spec.placement.hosts)
- def _add_rbd_mirror(daemons):
- return self._add_new_daemon('rbd-mirror', daemons, spec, self._create_rbd_mirror)
-
- return self._get_daemons('rbd-mirror').then(_add_rbd_mirror)
+ return self._add_new_daemon('rbd-mirror', spec, self._create_rbd_mirror)
@async_map_completion
def _create_rbd_mirror(self, daemon_id, host):
def add_prometheus(self, spec):
spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='prometheus').load()
- self.log.debug('nodes %s' % spec.placement.hosts)
-
- def _add(daemons):
- args = []
- num_added = 0
- for host, _, name in spec.placement.hosts:
- if num_added >= spec.count:
- break
- daemon_id = self.get_unique_name(host, daemons, None, name)
- self.log.debug('placing prometheus.%s on host %s' % (daemon_id,
- host))
- args.append((daemon_id, host))
-
- # add to daemon list so next name(s) will also be unique
- sd = orchestrator.ServiceDescription()
- sd.service_instance = daemon_id
- sd.service_type = 'prometheus'
- sd.nodename = host
- daemons.append(sd)
- num_added += 1
- return self._create_prometheus(args)
-
- return self._get_daemons('prometheus').then(_add)
+ return self._add_new_daemon('prometheus', spec, self._create_prometheus)
@async_map_completion
def _create_prometheus(self, daemon_id, host):
target_name = image
else:
raise OrchestratorError('must specify either image or version')
- return self._get_daemons().then(
- lambda daemons: self._upgrade_check(target_name, daemons))
- def _upgrade_check(self, target_name, daemons):
- # get service state
target_id, target_version = self._get_container_image_id(target_name)
self.log.debug('Target image %s id %s version %s' % (
target_name, target_id, target_version))
'needs_update': dict(),
'up_to_date': list(),
}
- for s in daemons:
- if target_id == s.container_image_id:
- r['up_to_date'].append(s.name())
- else:
- r['needs_update'][s.name()] = {
- 'current_name': s.container_image_name,
- 'current_id': s.container_image_id,
- 'current_version': s.version,
- }
+ for host, di in self.daemon_cache.items():
+ for name, dd in di['daemons'].items():
+ if target_id == dd.container_image_id:
+ r['up_to_date'].append(dd.name())
+ else:
+ r['needs_update'][dd.name()] = {
+ 'current_name': dd.container_image_name,
+ 'current_id': dd.container_image_id,
+ 'current_version': dd.version,
+ }
return json.dumps(r, indent=4, sort_keys=True)
-
def upgrade_status(self):
r = orchestrator.UpgradeStatusSpec()
if self.upgrade_state: