import secrets
import orchestrator
from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
- CLICommandMeta, DaemonDescription, DaemonDescriptionStatus
+ CLICommandMeta, DaemonDescription, DaemonDescriptionStatus, handle_orch_error
from orchestrator._interface import GenericSpec
from orchestrator._interface import daemon_type_to_service
# ------------------------------------------------------------------------------
-class CephadmCompletion(orchestrator.Completion[T]):
- def evaluate(self) -> None:
- self.finalize(None)
-
-
-def trivial_completion(f: Callable[..., T]) -> Callable[..., CephadmCompletion[T]]:
- """
- Decorator to make CephadmCompletion methods return
- a completion object that executes themselves.
- """
-
- @wraps(f)
- def wrapper(*args: Any, **kwargs: Any) -> CephadmCompletion:
- return CephadmCompletion(on_complete=lambda _: f(*args, **kwargs))
-
- return wrapper
-
-
def service_inactive(spec_name: str) -> Callable:
def inner(func: Callable) -> Callable:
@wraps(func)
self.health_checks: Dict[str, dict] = {}
- self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
-
self.inventory = Inventory(self)
self.cache = HostCache(self)
return True, err, ret
- def process(self, completions: List[CephadmCompletion]) -> None: # type: ignore
- """
- Does nothing, as completions are processed in another thread.
- """
- if completions:
- self.log.debug("process: completions={0}".format(
- orchestrator.pretty_print(completions)))
-
- for p in completions:
- p.evaluate()
-
@orchestrator._cli_write_command(
prefix='cephadm set-ssh-config')
def _set_ssh_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
self.log.info('Added host %s' % spec.hostname)
return "Added host '{}'".format(spec.hostname)
- @trivial_completion
+ @handle_orch_error
def add_host(self, spec: HostSpec) -> str:
return self._add_host(spec)
- @trivial_completion
+ @handle_orch_error
def remove_host(self, host):
# type: (str) -> str
"""
self.log.info('Removed host %s' % host)
return "Removed host '{}'".format(host)
- @trivial_completion
+ @handle_orch_error
def update_host_addr(self, host: str, addr: str) -> str:
self.inventory.set_addr(host, addr)
self._reset_con(host)
self.log.info('Set host %s addr to %s' % (host, addr))
return "Updated host '{}' addr to '{}'".format(host, addr)
- @trivial_completion
+ @handle_orch_error
def get_hosts(self):
# type: () -> List[orchestrator.HostSpec]
"""
"""
return list(self.inventory.all_specs())
- @trivial_completion
+ @handle_orch_error
def add_host_label(self, host: str, label: str) -> str:
self.inventory.add_label(host, label)
self.log.info('Added label %s to host %s' % (label, host))
return 'Added label %s to host %s' % (label, host)
- @trivial_completion
+ @handle_orch_error
def remove_host_label(self, host: str, label: str) -> str:
self.inventory.rm_label(host, label)
self.log.info('Removed label %s to host %s' % (label, host))
+ 'Note the following:\n\n' + '\n'.join(notifications))
return 0, f'It is presumed safe to stop host {hostname}'
- @trivial_completion
+ @handle_orch_error
def host_ok_to_stop(self, hostname: str) -> str:
if hostname not in self.cache.get_hosts():
raise OrchestratorError(f'Cannot find host "{hostname}"')
}
self.set_health_checks(self.health_checks)
- @trivial_completion
+ @handle_orch_error
@host_exists()
def enter_host_maintenance(self, hostname: str, force: bool = False) -> str:
""" Attempt to place a cluster host in maintenance
return f"Ceph cluster {self._cluster_fsid} on {hostname} moved to maintenance"
- @trivial_completion
+ @handle_orch_error
@host_exists()
def exit_host_maintenance(self, hostname: str) -> str:
"""Exit maintenance mode and return a host to an operational state
self._kick_serve_loop()
- @trivial_completion
+ @handle_orch_error
def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None,
refresh: bool = False) -> List[orchestrator.ServiceDescription]:
if refresh:
sm[n].size = sm[n].size * 2
return list(sm.values())
- @trivial_completion
+ @handle_orch_error
def list_daemons(self,
service_name: Optional[str] = None,
daemon_type: Optional[str] = None,
result.append(dd)
return result
- @trivial_completion
+ @handle_orch_error
def service_action(self, action: str, service_name: str) -> List[str]:
dds: List[DaemonDescription] = self.cache.get_daemons_by_service(service_name)
if not dds:
'who': utils.name_to_config_section(daemon_type + '.' + daemon_id),
})
- @trivial_completion
+ @handle_orch_error
def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> str:
d = self.cache.get_daemon(daemon_name)
assert d.daemon_type is not None
self._kick_serve_loop()
return msg
- @trivial_completion
+ @handle_orch_error
def remove_daemons(self, names):
# type: (List[str]) -> List[str]
args = []
self.log.info('Remove daemons %s' % [a[0] for a in args])
return self._remove_daemons(args)
- @trivial_completion
+ @handle_orch_error
def remove_service(self, service_name: str) -> str:
self.log.info('Remove service %s' % service_name)
self._trigger_preview_refresh(service_name=service_name)
# must be idempotent: still a success.
return f'Failed to remove service. <{service_name}> was not found.'
- @trivial_completion
+ @handle_orch_error
def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
"""
Return the storage inventory of hosts matching the given filter.
inventory.Devices(dls)))
return result
- @trivial_completion
+ @handle_orch_error
def zap_device(self, host: str, path: str) -> str:
self.log.info('Zap device %s:%s' % (host, path))
out, err, code = CephadmServe(self)._run_cephadm(
raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
return '\n'.join(out + err)
- @trivial_completion
+ @handle_orch_error
def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]:
"""
Blink a device light. Calling something like::
self.log.info(f"Marking host: {host} for OSDSpec preview refresh.")
self.cache.osdspec_previews_refresh_queue.append(host)
- @trivial_completion
+ @handle_orch_error
def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> List[str]:
"""
Deprecated. Please use `apply()` instead.
"""
return [self._apply(spec) for spec in specs]
- @trivial_completion
+ @handle_orch_error
def create_osds(self, drive_group: DriveGroupSpec) -> str:
return self.osd_service.create_from_spec(drive_group)
return create_func_map(args)
- @trivial_completion
+ @handle_orch_error
def apply_mon(self, spec: ServiceSpec) -> str:
return self._apply(spec)
- @trivial_completion
+ @handle_orch_error
def add_mon(self, spec):
# type: (ServiceSpec) -> List[str]
return self._add_daemon('mon', spec, self.mon_service.prepare_create)
- @trivial_completion
+ @handle_orch_error
def add_mgr(self, spec):
# type: (ServiceSpec) -> List[str]
return self._add_daemon('mgr', spec, self.mgr_service.prepare_create)
'remove': [d.hostname for d in remove_daemon_hosts]
}
- @trivial_completion
+ @handle_orch_error
def plan(self, specs: Sequence[GenericSpec]) -> List:
results = [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n'
'to the current inventory setup. If any on these conditions changes, the \n'
self._kick_serve_loop()
return "Scheduled %s update..." % spec.service_name()
- @trivial_completion
+ @handle_orch_error
def apply(self, specs: Sequence[GenericSpec]) -> List[str]:
results = []
for spec in specs:
results.append(self._apply(spec))
return results
- @trivial_completion
+ @handle_orch_error
def apply_mgr(self, spec: ServiceSpec) -> str:
return self._apply(spec)
- @trivial_completion
+ @handle_orch_error
def add_mds(self, spec: ServiceSpec) -> List[str]:
return self._add_daemon('mds', spec, self.mds_service.prepare_create)
- @trivial_completion
+ @handle_orch_error
def apply_mds(self, spec: ServiceSpec) -> str:
return self._apply(spec)
- @trivial_completion
+ @handle_orch_error
def add_rgw(self, spec: ServiceSpec) -> List[str]:
return self._add_daemon('rgw', spec, self.rgw_service.prepare_create)
- @trivial_completion
+ @handle_orch_error
def apply_rgw(self, spec: ServiceSpec) -> str:
return self._apply(spec)
- @trivial_completion
+ @handle_orch_error
def apply_ha_rgw(self, spec: ServiceSpec) -> str:
return self._apply(spec)
- @trivial_completion
+ @handle_orch_error
def add_iscsi(self, spec):
# type: (ServiceSpec) -> List[str]
return self._add_daemon('iscsi', spec, self.iscsi_service.prepare_create)
- @trivial_completion
+ @handle_orch_error
def apply_iscsi(self, spec: ServiceSpec) -> str:
return self._apply(spec)
- @trivial_completion
+ @handle_orch_error
def add_rbd_mirror(self, spec: ServiceSpec) -> List[str]:
return self._add_daemon('rbd-mirror', spec, self.rbd_mirror_service.prepare_create)
- @trivial_completion
+ @handle_orch_error
def apply_rbd_mirror(self, spec: ServiceSpec) -> str:
return self._apply(spec)
- @trivial_completion
+ @handle_orch_error
def add_nfs(self, spec: ServiceSpec) -> List[str]:
return self._add_daemon('nfs', spec, self.nfs_service.prepare_create)
- @trivial_completion
+ @handle_orch_error
def apply_nfs(self, spec: ServiceSpec) -> str:
return self._apply(spec)
# type: () -> str
return self.get('mgr_map').get('services', {}).get('dashboard', '')
- @trivial_completion
+ @handle_orch_error
def add_prometheus(self, spec: ServiceSpec) -> List[str]:
return self._add_daemon('prometheus', spec, self.prometheus_service.prepare_create)
- @trivial_completion
+ @handle_orch_error
def apply_prometheus(self, spec: ServiceSpec) -> str:
return self._apply(spec)
- @trivial_completion
+ @handle_orch_error
def add_node_exporter(self, spec):
# type: (ServiceSpec) -> List[str]
return self._add_daemon('node-exporter', spec,
self.node_exporter_service.prepare_create)
- @trivial_completion
+ @handle_orch_error
def apply_node_exporter(self, spec: ServiceSpec) -> str:
return self._apply(spec)
- @trivial_completion
+ @handle_orch_error
def add_crash(self, spec):
# type: (ServiceSpec) -> List[str]
return self._add_daemon('crash', spec,
self.crash_service.prepare_create)
- @trivial_completion
+ @handle_orch_error
def apply_crash(self, spec: ServiceSpec) -> str:
return self._apply(spec)
- @trivial_completion
+ @handle_orch_error
def add_grafana(self, spec):
# type: (ServiceSpec) -> List[str]
return self._add_daemon('grafana', spec, self.grafana_service.prepare_create)
- @trivial_completion
+ @handle_orch_error
def apply_grafana(self, spec: ServiceSpec) -> str:
return self._apply(spec)
- @trivial_completion
+ @handle_orch_error
def add_alertmanager(self, spec):
# type: (ServiceSpec) -> List[str]
return self._add_daemon('alertmanager', spec, self.alertmanager_service.prepare_create)
- @trivial_completion
+ @handle_orch_error
def apply_alertmanager(self, spec: ServiceSpec) -> str:
return self._apply(spec)
- @trivial_completion
+ @handle_orch_error
def add_container(self, spec: ServiceSpec) -> List[str]:
return self._add_daemon('container', spec,
self.container_service.prepare_create)
- @trivial_completion
+ @handle_orch_error
def apply_container(self, spec: ServiceSpec) -> str:
return self._apply(spec)
- @trivial_completion
+ @handle_orch_error
def add_cephadm_exporter(self, spec):
# type: (ServiceSpec) -> List[str]
return self._add_daemon('cephadm-exporter',
spec,
self.cephadm_exporter_service.prepare_create)
- @trivial_completion
+ @handle_orch_error
def apply_cephadm_exporter(self, spec: ServiceSpec) -> str:
return self._apply(spec)
- @trivial_completion
+ @handle_orch_error
def upgrade_check(self, image: str, version: str) -> str:
if self.inventory.get_host_with_state("maintenance"):
raise OrchestratorError("check aborted - you have hosts in maintenance state")
return json.dumps(r, indent=4, sort_keys=True)
- @trivial_completion
+ @handle_orch_error
def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
return self.upgrade.upgrade_status()
- @trivial_completion
+ @handle_orch_error
def upgrade_start(self, image: str, version: str) -> str:
if self.inventory.get_host_with_state("maintenance"):
raise OrchestratorError("upgrade aborted - you have host(s) in maintenance state")
return self.upgrade.upgrade_start(image, version)
- @trivial_completion
+ @handle_orch_error
def upgrade_pause(self) -> str:
return self.upgrade.upgrade_pause()
- @trivial_completion
+ @handle_orch_error
def upgrade_resume(self) -> str:
return self.upgrade.upgrade_resume()
- @trivial_completion
+ @handle_orch_error
def upgrade_stop(self) -> str:
return self.upgrade.upgrade_stop()
- @trivial_completion
+ @handle_orch_error
def remove_osds(self, osd_ids: List[str],
replace: bool = False,
force: bool = False) -> str:
self._kick_serve_loop()
return "Scheduled OSD(s) for removal"
- @trivial_completion
+ @handle_orch_error
def stop_remove_osds(self, osd_ids: List[str]) -> str:
"""
Stops a `removal` process for a List of OSDs.
self._kick_serve_loop()
return "Stopped OSD(s) removal"
- @trivial_completion
+ @handle_orch_error
def remove_osds_status(self) -> List[OSD]:
"""
The CLI call to retrieve an osd removal report