def _name_to_entity_name(name):
"""
- Map from service names to ceph entity names (as seen in config)
+ Map from daemon names to ceph entity names (as seen in config)
"""
if name.startswith('rgw.') or name.startswith('rbd-mirror'):
return 'client.' + name
return AsyncCompletion(value=val, name='trivial_result')
-def with_services(service_type=None,
- service_name=None,
- service_id=None,
- node_name=None,
- refresh=False):
+def with_daemons(daemon_type=None,
+ daemon_id=None,
+ service_name=None,
+ host=None,
+ refresh=False):
def decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
- def on_complete(services):
+ def on_complete(daemons):
if kwargs:
- kwargs['services'] = services
+ kwargs['daemons'] = daemons
return func(self, *args, **kwargs)
else:
- args_ = args + (services,)
+ args_ = args + (daemons,)
return func(self, *args_, **kwargs)
- return self._get_services(service_type=service_type,
- service_name=service_name,
- service_id=service_id,
- node_name=node_name,
- refresh=refresh).then(on_complete)
+ return self._get_daemons(daemon_type=daemon_type,
+ daemon_id=daemon_id,
+ service_name=service_name,
+ host=host,
+ refresh=refresh).then(on_complete)
return wrapper
return decorator
'desc': 'seconds to cache device inventory',
},
{
- 'name': 'service_cache_timeout',
+ 'name': 'daemon_cache_timeout',
'type': 'secs',
'default': 60,
'desc': 'seconds to cache service (daemon) inventory',
'name': 'warn_on_stray_hosts',
'type': 'bool',
'default': True,
- 'desc': 'raise a health warning if services are detected on a host '
+ 'desc': 'raise a health warning if daemons are detected on a host '
'that is not managed by cephadm',
},
{
- 'name': 'warn_on_stray_services',
+ 'name': 'warn_on_stray_daemons',
'type': 'bool',
'default': True,
- 'desc': 'raise a health warning if services are detected '
+ 'desc': 'raise a health warning if daemons are detected '
'that are not managed by cephadm',
},
{
if TYPE_CHECKING:
self.ssh_config_file = None # type: Optional[str]
self.inventory_cache_timeout = 0
- self.service_cache_timeout = 0
+ self.daemon_cache_timeout = 0
self.mode = ''
self.container_image_base = ''
self.warn_on_stray_hosts = True
- self.warn_on_stray_services = True
+ self.warn_on_stray_daemons = True
self.warn_on_failed_host_check = True
self._cons = {} # type: Dict[str, Tuple[remoto.backends.BaseConnection,remoto.backends.LegacyModuleExecute]]
self.inventory_cache = orchestrator.OutdatablePersistentDict(
self, self._STORE_HOST_PREFIX + '.devices')
- self.service_cache = orchestrator.OutdatablePersistentDict(
- self, self._STORE_HOST_PREFIX + '.services')
+ self.daemon_cache = orchestrator.OutdatablePersistentDict(
+ self, self._STORE_HOST_PREFIX + '.daemons')
# ensure the host lists are in sync
for h in self.inventory.keys():
if h not in self.inventory_cache:
self.log.debug('adding inventory item for %s' % h)
self.inventory_cache[h] = orchestrator.OutdatableData()
- if h not in self.service_cache:
+ if h not in self.daemon_cache:
self.log.debug('adding service item for %s' % h)
- self.service_cache[h] = orchestrator.OutdatableData()
+ self.daemon_cache[h] = orchestrator.OutdatableData()
for h in self.inventory_cache:
if h not in self.inventory:
del self.inventory_cache[h]
- for h in self.service_cache:
+ for h in self.daemon_cache:
if h not in self.inventory:
- del self.service_cache[h]
+ del self.daemon_cache[h]
def shutdown(self):
self.log.info('shutdown')
# only wait a little bit; the service might go away for something
tries = 4
while tries > 0:
- if s.service_type not in ['mon', 'osd', 'mds']:
+ if s.daemon_type not in ['mon', 'osd', 'mds']:
break
ret, out, err = self.mon_command({
- 'prefix': '%s ok-to-stop' % s.service_type,
- 'ids': [s.service_instance],
+ 'prefix': '%s ok-to-stop' % s.daemon_type,
+ 'ids': [s.daemon_id],
})
if not self.upgrade_state or self.upgrade_state.get('paused'):
return False
if ret:
self.log.info('Upgrade: It is NOT safe to stop %s.%s' %
- (s.service_type, s.service_instance))
+ (s.daemon_type, s.daemon_id))
time.sleep(15)
tries -= 1
else:
self.log.info('Upgrade: It is safe to stop %s.%s' %
- (s.service_type, s.service_instance))
+ (s.daemon_type, s.daemon_id))
return True
self.log.info('Upgrade: It is safe to stop %s.%s' %
- (s.service_type, s.service_instance))
+ (s.daemon_type, s.daemon_id))
return True
def _clear_upgrade_health_checks(self):
self.set_health_checks(self.health_checks)
def _do_upgrade(self, daemons):
- # type: (List[orchestrator.ServiceDescription]) -> Optional[AsyncCompletion]
+ # type: (List[orchestrator.DaemonDescription]) -> Optional[AsyncCompletion]
if not self.upgrade_state:
self.log.debug('_do_upgrade no state, exiting')
return None
self.log.info('Upgrade: Checking %s daemons...' % daemon_type)
need_upgrade_self = False
for d in daemons:
- if d.service_type != daemon_type:
+ if d.daemon_type != daemon_type:
continue
if not d.container_image_id:
self.log.debug('daemon %s.%s image_id is not known' % (
- daemon_type, d.service_instance))
+ daemon_type, d.daemon_id))
return None
if d.container_image_id == target_id:
self.log.debug('daemon %s.%s version correct' % (
- daemon_type, d.service_instance))
+ daemon_type, d.daemon_id))
continue
self.log.debug('daemon %s.%s version incorrect (%s, %s)' % (
- daemon_type, d.service_instance,
+ daemon_type, d.daemon_id,
d.container_image_id, d.version))
if daemon_type == 'mgr' and \
- d.service_instance == self.get_mgr_id():
+ d.daemon_id == self.get_mgr_id():
self.log.info('Upgrade: Need to upgrade myself (mgr.%s)' %
self.get_mgr_id())
need_upgrade_self = True
if not self._wait_for_ok_to_stop(d):
return None
self.log.info('Upgrade: Redeploying %s.%s' %
- (d.service_type, d.service_instance))
+ (d.daemon_type, d.daemon_id))
ret, out, err = self.mon_command({
'prefix': 'config set',
'name': 'container_image',
'value': target_name,
- 'who': daemon_type + '.' + d.service_instance,
+ 'who': daemon_type + '.' + d.daemon_id,
})
return self._daemon_action([(
- d.service_type,
- d.service_instance,
+ d.daemon_type,
+ d.daemon_id,
d.nodename,
'redeploy'
)])
}
self.set_health_checks(self.health_checks)
- def _check_for_strays(self, services):
+ def _check_for_strays(self, daemons):
self.log.debug('_check_for_strays')
for k in ['CEPHADM_STRAY_HOST',
- 'CEPHADM_STRAY_SERVICE']:
+ 'CEPHADM_STRAY_DAEMON']:
if k in self.health_checks:
del self.health_checks[k]
- if self.warn_on_stray_hosts or self.warn_on_stray_services:
+ if self.warn_on_stray_hosts or self.warn_on_stray_daemons:
ls = self.list_servers()
managed = []
- for s in services:
+ for s in daemons:
managed.append(s.name())
- self.log.debug('cephadm daemons %s' % managed)
host_detail = [] # type: List[str]
- host_num_services = 0
- service_detail = [] # type: List[str]
+ host_num_daemons = 0
+ daemon_detail = [] # type: List[str]
for item in ls:
host = item.get('hostname')
- services = item.get('services')
+ daemons = item.get('services') # misnomer!
missing_names = []
- for s in services:
+ for s in daemons:
name = '%s.%s' % (s.get('type'), s.get('id'))
if host not in self.inventory:
missing_names.append(name)
- host_num_services += 1
+ host_num_daemons += 1
if name not in managed:
- service_detail.append(
- 'stray service %s on host %s not managed by cephadm' % (name, host))
+ daemon_detail.append(
+ 'stray daemon %s on host %s not managed by cephadm' % (name, host))
if missing_names:
host_detail.append(
'stray host %s has %d stray daemons: %s' % (
if host_detail:
self.health_checks['CEPHADM_STRAY_HOST'] = {
'severity': 'warning',
- 'summary': '%d stray host(s) with %s service(s) '
+ 'summary': '%d stray host(s) with %s daemon(s) '
'not managed by cephadm' % (
- len(host_detail), host_num_services),
+ len(host_detail), host_num_daemons),
'count': len(host_detail),
'detail': host_detail,
}
- if service_detail:
- self.health_checks['CEPHADM_STRAY_SERVICE'] = {
+ if daemon_detail:
+ self.health_checks['CEPHADM_STRAY_DAEMON'] = {
'severity': 'warning',
- 'summary': '%d stray service(s) not managed by cephadm' % (
- len(service_detail)),
- 'count': len(service_detail),
- 'detail': service_detail,
+ 'summary': '%d stray daemons(s) not managed by cephadm' % (
+ len(daemon_detail)),
+ 'count': len(daemon_detail),
+ 'detail': daemon_detail,
}
self.set_health_checks(self.health_checks)
while self.run:
self._check_hosts()
- # refresh services
- self.log.debug('refreshing services')
- completion = self._get_services(maybe_refresh=True)
+ # refresh daemons
+ self.log.debug('refreshing daemons')
+ completion = self._get_daemons(maybe_refresh=True)
self._orchestrator_wait([completion])
# FIXME: this is a band-aid to avoid crashing the mgr, but what
# we really need to do here is raise health alerts for individual
# hosts that fail and continue with the ones that do not fail.
if completion.exception is not None:
- self.log.error('failed to refresh services: %s' % completion.exception)
+ self.log.error('failed to refresh daemons: %s' % completion.exception)
self.health_checks['CEPHADM_REFRESH_FAILED'] = {
'severity': 'warning',
'summary': 'failed to probe one or more hosts',
if 'CEPHADM_REFRESH_FAILED' in self.health_checks:
del self.health_checks['CEPHADM_REFRESH_FAILED']
self.set_health_checks(self.health_checks)
- services = completion.result
- self.log.debug('services %s' % services)
+ daemons = completion.result
+ self.log.debug('daemons %s' % daemons)
- self._check_for_strays(services)
+ self._check_for_strays(daemons)
if self.upgrade_state and not self.upgrade_state.get('paused'):
- completion = self._do_upgrade(services)
+ completion = self._do_upgrade(daemons)
if completion:
while not completion.has_result:
self.process([completion])
pass
def get_unique_name(self, host, existing, prefix=None, forcename=None):
- # type: (str, List[orchestrator.ServiceDescription], Optional[str], Optional[str]) -> str
+ # type: (str, List[orchestrator.DaemonDescription], Optional[str], Optional[str]) -> str
"""
Generate a unique random service name
"""
if forcename:
- if len([d for d in existing if d.service_instance == forcename]):
+ if len([d for d in existing if d.daemon_id == forcename]):
raise RuntimeError('specified name %s already in use', forcename)
return forcename
name = ''
name += host + '.' + ''.join(random.choice(string.ascii_lowercase)
for _ in range(6))
- if len([d for d in existing if d.service_instance == name]):
+ if len([d for d in existing if d.daemon_id == name]):
self.log('name %s exists, trying again', name)
continue
return name
}
self._save_inventory()
self.inventory_cache[spec.hostname] = orchestrator.OutdatableData()
- self.service_cache[spec.hostname] = orchestrator.OutdatableData()
+ self.daemon_cache[spec.hostname] = orchestrator.OutdatableData()
self.event.set() # refresh stray health check
return "Added host '{}'".format(spec.hostname)
del self.inventory[host]
self._save_inventory()
del self.inventory_cache[host]
- del self.service_cache[host]
+ del self.daemon_cache[host]
self._reset_con(host)
self.event.set() # refresh stray health check
return "Removed host '{}'".format(host)
return 'Removed label %s from host %s' % (label, host)
@async_map_completion
- def _refresh_host_services(self, host):
+ def _refresh_host_daemons(self, host):
out, err, code = self._run_cephadm(
host, 'mon', 'ls', [], no_fsid=True)
data = json.loads(''.join(out))
for d in data:
d['last_refresh'] = datetime.datetime.utcnow().strftime(DATEFMT)
- self.log.debug('Refreshed host %s services: %s' % (host, data))
- self.service_cache[host] = orchestrator.OutdatableData(data)
+ self.log.debug('Refreshed host %s daemons: %s' % (host, data))
+ self.daemon_cache[host] = orchestrator.OutdatableData(data)
return host, data
- def _get_services(self,
- service_type=None,
- service_name=None,
- service_id=None,
- node_name=None,
- refresh=False,
- maybe_refresh=False):
+ def _get_daemons(self,
+ daemon_type=None,
+ daemon_id=None,
+ service_name=None,
+ host=None,
+ refresh=False,
+ maybe_refresh=False):
hosts = []
wait_for_args = []
- services = {}
+ daemons = {}
keys = None
- if node_name is not None:
- keys = [node_name]
- for host, host_info in self.service_cache.items_filtered(keys):
+ if host is not None:
+ keys = [host]
+ for host, host_info in self.daemon_cache.items_filtered(keys):
hosts.append(host)
if refresh:
- self.log.info("refreshing services for '{}'".format(host))
+ self.log.info("refreshing daemons for '{}'".format(host))
wait_for_args.append((host,))
- elif maybe_refresh and host_info.outdated(self.service_cache_timeout): # type: ignore
- self.log.info("refreshing stale services for '{}'".format(host))
+ elif maybe_refresh and host_info.outdated(self.daemon_cache_timeout): # type: ignore
+ self.log.info("refreshing stale daemons for '{}'".format(host))
wait_for_args.append((host,))
elif not host_info.last_refresh:
- services[host] = [
+ daemons[host] = [
{
- 'name': '*',
- 'service_type': '*',
- 'service_instance': '*',
+ 'name': '*.*',
'style': 'cephadm:v1',
'fsid': self._cluster_fsid,
},
]
else:
- self.log.debug('have recent services for %s: %s' % (
+ self.log.debug('have recent daemons for %s: %s' % (
host, host_info.data))
- services[host] = host_info.data
+ daemons[host] = host_info.data
- def _get_services_result(results):
+ def _get_daemons_result(results):
for host, data in results:
- services[host] = data
+ daemons[host] = data
result = []
- for host, ls in services.items():
+ for host, ls in daemons.items():
for d in ls:
if not d['style'].startswith('cephadm'):
self.log.debug('ignoring non-cephadm on %s: %s' % (host, d))
self.log.debug('ignoring foreign daemon on %s: %s' % (host, d))
continue
self.log.debug('including %s %s' % (host, d))
- sd = orchestrator.ServiceDescription()
+ sd = orchestrator.DaemonDescription()
if 'last_refresh' in d:
sd.last_refresh = datetime.datetime.strptime(
d['last_refresh'], DATEFMT)
- sd.service_type = d['name'].split('.')[0]
- if service_type and service_type != sd.service_type:
+ if '.' not in d['name']:
+ self.log.debug('ignoring dot-less daemon on %s: %s' % (host, d))
continue
- if '.' in d['name']:
- sd.service_instance = '.'.join(d['name'].split('.')[1:])
- elif d['name'] != '*':
- sd.service_instance = host
- if service_id and service_id != sd.service_instance:
+ sd.daemon_type = d['name'].split('.')[0]
+ if daemon_type and daemon_type != sd.daemon_type:
continue
- if service_name and not sd.service_instance.startswith(service_name + '.'):
+ sd.daemon_id = '.'.join(d['name'].split('.')[1:])
+ if daemon_id and daemon_id != sd.daemon_id:
+ continue
+ if service_name and not sd.daemon_id.startswith(service_name + '.'):
continue
sd.nodename = host
sd.container_id = d.get('container_id')
return result
if wait_for_args:
- return self._refresh_host_services(wait_for_args).then(
- _get_services_result)
+ return self._refresh_host_daemons(wait_for_args).then(
+ _get_daemons_result)
else:
- return trivial_result(_get_services_result({}))
-
- def describe_service(self, service_type=None, service_id=None,
- node_name=None, refresh=False):
- if service_type not in ("mds", "osd", "mgr", "mon", 'rgw', "nfs", None):
- raise orchestrator.OrchestratorValidationError(
- service_type + " unsupported")
- result = self._get_services(service_type,
- service_id=service_id,
- node_name=node_name,
- refresh=refresh)
+ return trivial_result(_get_daemons_result({}))
+
+# def describe_service(self, service_type=None, service_id=None,
+# node_name=None, refresh=False):
+# if service_type not in ("mds", "osd", "mgr", "mon", 'rgw', "nfs", None):
+# raise orchestrator.OrchestratorValidationError(
+# service_type + " unsupported")
+# result = self._get_daemons(service_type,
+# service_id=service_id,
+# node_name=node_name,
+# refresh=refresh)
+# return result
+
+ def list_daemons(self, daemon_type=None, daemon_id=None,
+ host=None, refresh=False):
+ result = self._get_daemons(daemon_type=daemon_type,
+ daemon_id=daemon_id,
+ host=host,
+ refresh=refresh)
return result
def service_action(self, action, service_type, service_name):
def _proc_daemons(daemons):
args = []
for d in daemons:
- args.append((d.service_type, d.service_instance,
+ args.append((d.daemon_type, d.daemon_id,
d.nodename, action))
if not args:
raise orchestrator.OrchestratorError(
service_type, service_name))
return self._daemon_action(args)
- return self._get_services(
+ return self._get_daemons(
service_type,
service_name=service_name).then(_proc_daemons)
host, name, 'unit',
['--name', name, a],
error_ok=True)
- self.service_cache.invalidate(host)
+ self.daemon_cache.invalidate(host)
self.log.debug('_daemon_action code %s out %s' % (code, out))
return "{} {} from host '{}'".format(action, name, host)
def _proc_daemons(daemons):
args = []
for d in daemons:
- args.append((d.service_type, d.service_instance,
+ args.append((d.daemon_type, d.daemon_id,
d.nodename, action))
if not args:
raise orchestrator.OrchestratorError(
daemon_type, daemon_id))
return self._daemon_action(args)
- return self._get_services(
- service_type=daemon_type,
- service_id=daemon_id).then(_proc_daemons)
+ return self._get_daemons(
+ daemon_type=daemon_type,
+ daemon_id=daemon_id).then(_proc_daemons)
def get_inventory(self, node_filter=None, refresh=False):
"""
return "Created osd(s) on host '{}'".format(host)
- @with_services('osd')
- def remove_osds(self, osd_ids, services):
- # type: (List[str], List[orchestrator.ServiceDescription]) -> AsyncCompletion
- args = [(d.name(), d.nodename) for d in services if
- d.service_instance in osd_ids]
+ @with_daemons('osd')
+ def remove_osds(self, osd_ids, daemons):
+ # type: (List[str], List[orchestrator.DaemonDescription]) -> AsyncCompletion
+ args = [(d.name(), d.nodename) for d in daemons if
+ d.daemon_id in osd_ids]
found = list(zip(*args))[0] if args else []
not_found = {osd_id for osd_id in osd_ids if 'osd.%s' % osd_id not in found}
] + extra_args,
stdin=j)
self.log.debug('create_daemon code %s out %s' % (code, out))
- if not code and host in self.service_cache:
+ if not code and host in self.daemon_cache:
# prime cached service state with what we (should have)
# just created
sd = {
'enabled': True,
'state': 'running',
}
- data = self.service_cache[host].data
+ data = self.daemon_cache[host].data
if data:
- data = [d for d in data if d['name'] != sd['name']]
+ data = [d for d in data if '%s.%s' % (daemon_type, daemon_id) != d['name']]
data.append(sd)
else:
data = [sd]
- self.service_cache[host] = orchestrator.OutdatableData(data)
- self.service_cache.invalidate(host)
+ self.daemon_cache[host] = orchestrator.OutdatableData(data)
+ self.daemon_cache.invalidate(host)
self.event.set()
return "{} {} on host '{}'".format(
'Reconfigured' if reconfig else 'Deployed', name, host)
host, name, 'rm-daemon',
['--name', name])
self.log.debug('_remove_daemon code %s out %s' % (code, out))
- if not code and host in self.service_cache:
+ if not code and host in self.daemon_cache:
# remove item from cache
- data = self.service_cache[host].data
+ data = self.daemon_cache[host].data
if data:
data = [d for d in data if d['name'] != name]
- self.service_cache[host] = orchestrator.OutdatableData(data)
- self.service_cache.invalidate(host)
+ self.daemon_cache[host] = orchestrator.OutdatableData(data)
+ self.daemon_cache.invalidate(host)
return "Removed {} from host '{}'".format(name, host)
def _update_service(self, daemon_type, add_func, spec):
args = []
for d in daemons[0:to_remove]:
args.append(
- ('%s.%s' % (d.service_type, d.service_instance), d.nodename)
+ ('%s.%s' % (d.daemon_type, d.daemon_id), d.nodename)
)
return self._remove_daemon(args)
elif len(daemons) < spec.count:
return add_func(spec)
return []
- return self._get_services(daemon_type, service_name=spec.name).then(___update_service)
+ return self._get_daemons(daemon_type, service_name=spec.name).then(___update_service)
- def _add_new_daemon(self, svc_type: str, daemons: List[orchestrator.ServiceDescription],
+ def _add_new_daemon(self,
+ daemon_type: str,
+ daemons: List[orchestrator.DaemonDescription],
spec: orchestrator.ServiceSpec,
create_func: Callable):
args = []
num_added = 0
assert spec.count is not None
- prefix = f'{svc_type}.{spec.name}'
+ prefix = f'{daemon_type}.{spec.name}'
our_daemons = [d for d in daemons if d.name().startswith(prefix)]
hosts_with_daemons = {d.nodename for d in daemons}
hosts_without_daemons = {p for p in spec.placement.hosts if p.hostname not in hosts_with_daemons}
for host, _, name in hosts_without_daemons:
if (len(our_daemons) + num_added) >= spec.count:
break
- svc_id = self.get_unique_name(host, daemons, spec.name, name)
- self.log.debug('placing %s.%s on host %s' % (svc_type, svc_id, host))
- args.append((svc_id, host))
+ daemon_id = self.get_unique_name(host, daemons, spec.name, name)
+ self.log.debug('placing %s.%s on host %s' % (daemon_type, daemon_id, host))
+ args.append((daemon_id, host))
# add to daemon list so next name(s) will also be unique
- sd = orchestrator.ServiceDescription(
+ sd = orchestrator.DaemonDescription(
nodename=host,
- service_type=svc_type,
- service_instance=svc_id,
+ daemon_type=daemon_type,
+ daemon_id=daemon_id,
)
daemons.append(sd)
num_added += 1
def update_mons_with_daemons(daemons):
for _, _, name in spec.placement.hosts:
- if name and len([d for d in daemons if d.service_instance == name]):
+ if name and len([d for d in daemons if d.daemon_id == name]):
raise RuntimeError('name %s alrady exists', name)
# explicit placement: enough hosts provided?
# TODO: we may want to chain the creation of the monitors so they join
# the quorum one at a time.
return self._create_mon(spec.placement.hosts)
- return self._get_services('mon').then(update_mons_with_daemons)
+ return self._get_daemons('mon').then(update_mons_with_daemons)
@async_map_completion
def _create_mgr(self, host, name):
return self._create_daemon('mgr', name, host, keyring=keyring)
- @with_services('mgr')
- def update_mgrs(self, spec, services):
- # type: (orchestrator.ServiceSpec, List[orchestrator.ServiceDescription]) -> orchestrator.Completion
+ @with_daemons('mgr')
+ def update_mgrs(self, spec, daemons):
+ # type: (orchestrator.ServiceSpec, List[orchestrator.DaemonDescription]) -> orchestrator.Completion
"""
Adjust the number of cluster managers.
"""
spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='mgr').load()
- num_mgrs = len(services)
+ num_mgrs = len(daemons)
if spec.count == num_mgrs:
return orchestrator.Completion(value="The requested number of managers exist.")
for standby in mgr_map.get('standbys', []):
connected.append(standby.get('name', ''))
to_remove_damons = []
- for d in services:
- if d.service_instance not in connected:
- to_remove_damons.append(('%s.%s' % (d.service_type, d.service_instance),
+ for d in daemons:
+ if d.daemon_id not in connected:
+ to_remove_damons.append(('%s.%s' % (d.daemon_type, d.daemon_id),
d.nodename))
num_to_remove -= 1
if num_to_remove == 0:
# otherwise, remove *any* mgr
if num_to_remove > 0:
- for d in services:
- to_remove_damons.append(('%s.%s' % (d.service_type, d.service_instance), d.nodename))
+ for d in daemons:
+ to_remove_damons.append(('%s.%s' % (d.daemon_type, d.daemon_id), d.nodename))
num_to_remove -= 1
if num_to_remove == 0:
break
len(spec.placement.hosts), num_new_mgrs))
for host_spec in spec.placement.hosts:
- if host_spec.name and len([d for d in services if d.service_instance == host_spec.name]):
+ if host_spec.name and len([d for d in daemons if d.daemon_id == host_spec.name]):
raise RuntimeError('name %s alrady exists', host_spec.name)
for host_spec in spec.placement.hosts:
- if host_spec.name and len([d for d in services if d.service_instance == host_spec.name]):
+ if host_spec.name and len([d for d in daemons if d.daemon_id == host_spec.name]):
raise RuntimeError('name %s alrady exists', host_spec.name)
self.log.info("creating {} managers on hosts: '{}'".format(
args = []
for host_spec in spec.placement.hosts:
host = host_spec.hostname
- name = host_spec.name or self.get_unique_name(host, services)
+ name = host_spec.name or self.get_unique_name(host, daemons)
args.append((host, name))
c = self._create_mgr(args)
c.add_progress('Creating MGRs', self)
})
def _add_mds(daemons):
- # type: (List[orchestrator.ServiceDescription]) -> AsyncCompletion
+ # type: (List[orchestrator.DaemonDescription]) -> AsyncCompletion
return self._add_new_daemon('mds', daemons, spec, self._create_mds)
- return self._get_services('mds').then(_add_mds)
+ return self._get_daemons('mds').then(_add_mds)
def update_mds(self, spec):
# type: (orchestrator.ServiceSpec) -> AsyncCompletion
def _remove_mds(daemons):
args = []
for d in daemons:
- if d.service_instance == name or d.service_instance.startswith(name + '.'):
+ if d.daemon_id == name or d.daemon_id.startswith(name + '.'):
args.append(
- ('%s.%s' % (d.service_type, d.service_instance), d.nodename)
+ ('%s.%s' % (d.daemon_type, d.daemon_id), d.nodename)
)
if not args:
raise OrchestratorError('Unable to find mds.%s[-*] daemon(s)' % name)
return self._remove_daemon(args)
- return self._get_services('mds').then(_remove_mds)
+ return self._get_daemons('mds').then(_remove_mds)
def add_rgw(self, spec):
if not spec.placement.hosts or len(spec.placement.hosts) < spec.count:
def _add_rgw(daemons):
return self._add_new_daemon('rgw', daemons, spec, self._create_rgw)
- return self._get_services('rgw').then(_add_rgw)
+ return self._get_daemons('rgw').then(_add_rgw)
@async_map_completion
def _create_rgw(self, rgw_id, host):
def _remove_rgw(daemons):
args = []
for d in daemons:
- if d.service_instance == name or d.service_instance.startswith(name + '.'):
- args.append(('%s.%s' % (d.service_type, d.service_instance),
+ if d.daemon_id == name or d.daemon_id.startswith(name + '.'):
+ args.append(('%s.%s' % (d.daemon_type, d.daemon_id),
d.nodename))
if args:
return self._remove_daemon(args)
raise RuntimeError('Unable to find rgw.%s[-*] daemon(s)' % name)
- return self._get_services('rgw').then(_remove_rgw)
+ return self._get_daemons('rgw').then(_remove_rgw)
def update_rgw(self, spec):
spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='rgw').load()
def _add_rbd_mirror(daemons):
return self._add_new_daemon('rbd-mirror', daemons, spec, self._create_rbd_mirror)
- return self._get_services('rbd-mirror').then(_add_rbd_mirror)
+ return self._get_daemons('rbd-mirror').then(_add_rbd_mirror)
@async_map_completion
def _create_rbd_mirror(self, daemon_id, host):
def _remove_rbd_mirror(daemons):
args = []
for d in daemons:
- if not name or d.service_instance == name:
+ if not name or d.daemon_id == name:
args.append(
- ('%s.%s' % (d.service_type, d.service_instance),
+ ('%s.%s' % (d.daemon_type, d.daemon_id),
d.nodename)
)
if not args and name:
raise RuntimeError('Unable to find rbd-mirror.%s daemon' % name)
return self._remove_daemon(args)
- return self._get_services('rbd-mirror').then(_remove_rbd_mirror)
+ return self._get_daemons('rbd-mirror').then(_remove_rbd_mirror)
def update_rbd_mirror(self, spec):
spec = NodeAssignment(spec=spec, get_hosts_func=self._get_hosts, service_type='rbd-mirror').load()
target_name = image
else:
raise OrchestratorError('must specify either image or version')
- return self._get_services().then(
+ return self._get_daemons().then(
lambda daemons: self._upgrade_check(target_name, daemons))
- def _upgrade_check(self, target_name, services):
+ def _upgrade_check(self, target_name, daemons):
# get service state
target_id, target_version = self._get_container_image_id(target_name)
self.log.debug('Target image %s id %s version %s' % (
'needs_update': dict(),
'up_to_date': list(),
}
- for s in services:
+ for s in daemons:
if target_id == s.container_image_id:
r['up_to_date'].append(s.name())
else:
self.spec = spec # type: orchestrator.ServiceSpec
self.scheduler = scheduler if scheduler else SimpleScheduler(self.spec.placement)
self.get_hosts_func = get_hosts_func
- self.service_type = service_type
+ self.daemon_type = service_type
def load(self):
# type: () -> orchestrator.ServiceSpec
# host pool (assuming `count` is set)
if not self.spec.placement.label and not self.spec.placement.hosts and self.spec.placement.count:
logger.info("Found num spec. Looking for labeled nodes.")
- # TODO: actually query for labels (self.service_type)
+ # TODO: actually query for labels (self.daemon_type)
candidates = self.scheduler.place([x[0] for x in self.get_hosts_func()],
count=self.spec.placement.count)
# Not enough nodes to deploy on
candidates = self.scheduler.place([x[0] for x in self.get_hosts_func()], count=self.spec.placement.count)
# Not enough nodes to deploy on
if len(candidates) != self.spec.placement.count:
- raise OrchestratorValidationError("Cannot place {} services on {} hosts.".
+ raise OrchestratorValidationError("Cannot place {} daemons on {} hosts.".
format(self.spec.placement.count, len(candidates)))
logger.info('Assigning nodes to spec: {}'.format(candidates))