r.append(dd)
return r
+ def get_daemon(self, daemon_name: str) -> orchestrator.DaemonDescription:
+ for _, dm in self.daemons.items():
+ for _, dd in dm.items():
+ if dd.name() == daemon_name:
+ return dd
+ raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)')
+
def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
def alter(host, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
dd = copy(dd_orig)
).name()):
return self._daemon_action(daemon_type, daemon_id, host, action)
- def _daemon_action(self, daemon_type, daemon_id, host, action):
+ def _daemon_action(self, daemon_type, daemon_id, host, action, image=None):
daemon_spec: CephadmDaemonSpec = CephadmDaemonSpec(
host=host,
daemon_id=daemon_id,
daemon_type=daemon_type,
)
+ if image is not None:
+ if action != 'redeploy':
+ raise OrchestratorError(
+ f'Cannot execute {action} with new image. `action` needs to be `redeploy`')
+ if daemon_type not in CEPH_TYPES:
+ raise OrchestratorError(
+ f'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported '
+ f'types are: {", ".join(CEPH_TYPES)}')
+
+ self.check_mon_command({
+ 'prefix': 'config set',
+ 'name': 'container_image',
+ 'value': image,
+ 'who': utils.name_to_config_section(daemon_type + '.' + daemon_id),
+ })
+
if action == 'redeploy':
# stop, recreate the container+unit, then restart
return self._create_daemon(daemon_spec)
except Exception:
self.log.exception(f'`{host}: cephadm unit {name} {a}` failed')
self.cache.invalidate_host_daemons(daemon_spec.host)
- return "{} {} from host '{}'".format(action, name, daemon_spec.host)
+ msg = "{} {} from host '{}'".format(action, name, daemon_spec.host)
+ self.events.for_daemon(name, 'INFO', msg)
+ return msg
@trivial_completion
- def daemon_action(self, action, daemon_type, daemon_id) -> List[str]:
- args = []
- for host, dm in self.cache.daemons.items():
- for name, d in dm.items():
- if d.daemon_type == daemon_type and d.daemon_id == daemon_id:
- args.append((d.daemon_type, d.daemon_id,
- d.hostname, action))
- if not args:
- raise orchestrator.OrchestratorError(
- 'Unable to find %s.%s daemon(s)' % (
- daemon_type, daemon_id))
- self.log.info('%s daemons %s' % (
- action.capitalize(),
- ','.join(['%s.%s' % (a[0], a[1]) for a in args])))
- return self._daemon_actions(args)
+ def daemon_action(self, action: str, daemon_name: str, image: Optional[str]=None) -> str:
+ d = self.cache.get_daemon(daemon_name)
+
+ self.log.info(f'{action} daemon {daemon_name}')
+ return self._daemon_action(d.daemon_type, d.daemon_id,
+ d.hostname, action, image=image)
@trivial_completion
def remove_daemons(self, names):
with with_host(cephadm_module, 'test'):
with with_daemon(cephadm_module, RGWSpec(service_id='myrgw.foobar'), CephadmOrchestrator.add_rgw, 'test') as daemon_id:
- c = cephadm_module.daemon_action('redeploy', 'rgw', daemon_id)
- assert wait(cephadm_module, c) == [f"Deployed rgw.{daemon_id} on host 'test'"]
+ c = cephadm_module.daemon_action('redeploy', 'rgw.' + daemon_id)
+ assert wait(cephadm_module, c) == f"Deployed rgw.{daemon_id} on host 'test'"
for what in ('start', 'stop', 'restart'):
- c = cephadm_module.daemon_action(what, 'rgw', daemon_id)
- assert wait(cephadm_module, c) == [what + f" rgw.{daemon_id} from host 'test'"]
+ c = cephadm_module.daemon_action(what, 'rgw.' + daemon_id)
+ assert wait(cephadm_module, c) == what + f" rgw.{daemon_id} from host 'test'"
# Make sure, _check_daemons does a redeploy due to monmap change:
cephadm_module._store['_ceph_get/mon_map'] = {
#assert action in ["start", "stop", "reload, "restart", "redeploy"]
raise NotImplementedError()
- def daemon_action(self, action, daemon_type, daemon_id):
- # type: (str, str, str) -> Completion[List[str]]
+ def daemon_action(self, action: str, daemon_name: str, image: Optional[str]=None) -> Completion[str]:
"""
Perform an action (start/stop/reload) on a daemon.
:param action: one of "start", "stop", "restart", "redeploy", "reconfig"
- :param name: name of daemon
+ :param daemon_name: name of daemon
+ :param image: Container image when redeploying that daemon
:rtype: Completion
"""
#assert action in ["start", "stop", "reload, "restart", "redeploy"]
@_cli_write_command(
'orch daemon',
- "name=action,type=CephChoices,strings=start|stop|restart|redeploy|reconfig "
+ "name=action,type=CephChoices,strings=start|stop|restart|reconfig "
"name=name,type=CephString",
- 'Start, stop, restart, redeploy, or reconfig a specific daemon')
+ 'Start, stop, restart, (redeploy,) or reconfig a specific daemon')
def _daemon_action(self, action, name):
if '.' not in name:
raise OrchestratorError('%s is not a valid daemon name' % name)
- (daemon_type, daemon_id) = name.split('.', 1)
- completion = self.daemon_action(action, daemon_type, daemon_id)
+ completion = self.daemon_action(action, name)
+ self._orchestrator_wait([completion])
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
+ @_cli_write_command(
+ 'orch daemon redeploy',
+ "name=name,type=CephString "
+ "name=image,type=CephString,req=false",
+ 'Redeploy a daemon (with a specifc image)')
+ def _daemon_action_redeploy(self, name, image):
+ if '.' not in name:
+ raise OrchestratorError('%s is not a valid daemon name' % name)
+ completion = self.daemon_action("redeploy", name, image=image)
self._orchestrator_wait([completion])
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
pass
@deferred_write("daemon_action")
- def daemon_action(self, action, daemon_type, daemon_id):
+ def daemon_action(self, action, daemon_name, image=None):
pass
@deferred_write("Adding NFS service")