daemon_type=daemon_type,
)
- if image is not None:
- if action != 'redeploy':
- raise OrchestratorError(
- f'Cannot execute {action} with new image. `action` needs to be `redeploy`')
- if daemon_type not in CEPH_TYPES:
- raise OrchestratorError(
- f'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported '
- f'types are: {", ".join(CEPH_TYPES)}')
-
- self.check_mon_command({
- 'prefix': 'config set',
- 'name': 'container_image',
- 'value': image,
- 'who': utils.name_to_config_section(daemon_type + '.' + daemon_id),
- })
+ self._daemon_action_set_image(action, image, daemon_type, daemon_id)
if action == 'redeploy':
+ if self.daemon_is_self(daemon_type, daemon_id):
+ self.mgr_service.fail_over()
+ return # unreachable.
# stop, recreate the container+unit, then restart
return self._create_daemon(daemon_spec)
elif action == 'reconfig':
self.events.for_daemon(name, 'INFO', msg)
return msg
+ def _daemon_action_set_image(self, action: str, image: Optional[str], daemon_type: str, daemon_id: str):
+ if image is not None:
+ if action != 'redeploy':
+ raise OrchestratorError(
+ f'Cannot execute {action} with new image. `action` needs to be `redeploy`')
+ if daemon_type not in CEPH_TYPES:
+ raise OrchestratorError(
+ f'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported '
+ f'types are: {", ".join(CEPH_TYPES)}')
+
+ self.check_mon_command({
+ 'prefix': 'config set',
+ 'name': 'container_image',
+ 'value': image,
+ 'who': utils.name_to_config_section(daemon_type + '.' + daemon_id),
+ })
+
@trivial_completion
def daemon_action(self, action: str, daemon_name: str, image: Optional[str]=None) -> str:
d = self.cache.get_daemon(daemon_name)
- self.log.info(f'{action} daemon {daemon_name}')
- return self._daemon_action(d.daemon_type, d.daemon_id,
- d.hostname, action, image=image)
+ if action == 'redeploy' and self.daemon_is_self(d.daemon_type, d.daemon_id) \
+ and not self.mgr_service.mgr_map_has_standby():
+ raise OrchestratorError(
+ f'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
+
+ self._daemon_action_set_image(action, image, d.daemon_type, d.daemon_id)
+
+ self.log.info(f'Schedule {action} daemon {daemon_name}')
+ return self._schedule_daemon_action(daemon_name, action)
+
+ def daemon_is_self(self, daemon_type: str, daemon_id: str) -> bool:
+ return daemon_type == 'mgr' and daemon_id == self.get_mgr_id()
+
+ def _schedule_daemon_action(self, daemon_name: str, action: str):
+ dd = self.cache.get_daemon(daemon_name)
+ if action == 'redeploy' and self.daemon_is_self(dd.daemon_type, dd.daemon_id) \
+ and not self.mgr_service.mgr_map_has_standby():
+ raise OrchestratorError(
+ f'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
+ self.cache.schedule_daemon_action(dd.hostname, dd.name(), action)
+ msg = "Scheduled to {} {} on host '{}'".format(action, daemon_name, dd.hostname)
+ self._kick_serve_loop()
+ return msg
@trivial_completion
def remove_daemons(self, names):
dd.hostname, dd.name())
if last_deps is None:
last_deps = []
- reconfig = False
+ action = self.cache.get_scheduled_daemon_action(dd.hostname, dd.name())
if not last_config:
self.log.info('Reconfiguring %s (unknown last config time)...'% (
dd.name()))
- reconfig = True
+ action = 'reconfig'
elif last_deps != deps:
self.log.debug('%s deps %s -> %s' % (dd.name(), last_deps,
deps))
self.log.info('Reconfiguring %s (dependencies changed)...' % (
dd.name()))
- reconfig = True
+ action = 'reconfig'
elif self.last_monmap and \
self.last_monmap > last_config and \
dd.daemon_type in CEPH_TYPES:
self.log.info('Reconfiguring %s (monmap changed)...' % dd.name())
- reconfig = True
- if reconfig:
+ action = 'reconfig'
+ if action:
+ if self.cache.get_scheduled_daemon_action(dd.hostname, dd.name()) == 'redeploy' \
+ and action == 'reconfig':
+ action = 'redeploy'
try:
- self._create_daemon(
- CephadmDaemonSpec(
- host=dd.hostname,
- daemon_id=dd.daemon_id,
- daemon_type=dd.daemon_type),
- reconfig=True)
+ self._daemon_action(
+ daemon_type=dd.daemon_type,
+ daemon_id=dd.daemon_id,
+ host=dd.hostname,
+ action=action
+ )
+ self.cache.rm_scheduled_daemon_action(dd.hostname, dd.name())
except OrchestratorError as e:
self.events.from_orch_error(e)
if dd.daemon_type in daemons_post:
return self.mgr._create_daemon(daemon_spec)
def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
- active_mgr_str = self.mgr.get('mgr_map')['active_name']
for daemon in daemon_descrs:
- if daemon.daemon_id == active_mgr_str:
+ if self.mgr.daemon_is_self(daemon.daemon_type, daemon.daemon_id):
return daemon
# if no active mgr found, return empty Daemon Desc
return DaemonDescription()
def fail_over(self):
- mgr_map = self.mgr.get('mgr_map')
- num = len(mgr_map.get('standbys'))
- if not num:
+ if not self.mgr_map_has_standby():
raise OrchestratorError('Need standby mgr daemon', event_kind_subject=(
'daemon', 'mgr' + self.mgr.get_mgr_id()))
'who': self.mgr.get_mgr_id(),
})
+ def mgr_map_has_standby(self) -> bool:
+ """
+ This is a bit safer than asking our inventory. If the mgr joined the mgr map,
+ we know it joined the cluster
+ """
+ mgr_map = self.mgr.get('mgr_map')
+ num = len(mgr_map.get('standbys'))
+ return bool(num)
+
class MdsService(CephadmService):
TYPE = 'mds'