CEPH_TYPES = set(CEPH_UPGRADE_ORDER)
-class CephadmCompletion(orchestrator.Completion):
+class CephadmCompletion(orchestrator.Completion[T]):
def evaluate(self):
self.finalize(None)
-def trivial_completion(f: Callable) -> Callable[..., CephadmCompletion]:
+def trivial_completion(f: Callable[..., T]) -> Callable[..., CephadmCompletion[T]]:
"""
Decorator to make CephadmCompletion methods return
a completion object that executes themselves.
return "Removed host '{}'".format(host)
@trivial_completion
- def update_host_addr(self, host, addr):
+ def update_host_addr(self, host, addr) -> str:
self.inventory.set_addr(host, addr)
self._reset_con(host)
self.event.set() # refresh stray health check
return list(self.inventory.all_specs())
@trivial_completion
- def add_host_label(self, host, label):
+ def add_host_label(self, host, label) -> str:
self.inventory.add_label(host, label)
self.log.info('Added label %s to host %s' % (label, host))
return 'Added label %s to host %s' % (label, host)
@trivial_completion
- def remove_host_label(self, host, label):
+ def remove_host_label(self, host, label) -> str:
self.inventory.rm_label(host, label)
self.log.info('Removed label %s to host %s' % (label, host))
return 'Removed label %s from host %s' % (label, host)
@trivial_completion
def describe_service(self, service_type=None, service_name=None,
- refresh=False):
+ refresh=False) -> List[orchestrator.ServiceDescription]:
if refresh:
# ugly sync path, FIXME someday perhaps?
for host in self.inventory.keys():
@trivial_completion
def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None,
- host=None, refresh=False):
+ host=None, refresh=False) -> List[orchestrator.DaemonDescription]:
if refresh:
# ugly sync path, FIXME someday perhaps?
if host:
return result
@trivial_completion
- def service_action(self, action, service_name):
+ def service_action(self, action, service_name) -> List[str]:
args = []
for host, dm in self.cache.daemons.items():
for name, d in dm.items():
return self._daemon_actions(args)
@forall_hosts
- def _daemon_actions(self, daemon_type, daemon_id, host, action):
+ def _daemon_actions(self, daemon_type, daemon_id, host, action) -> str:
with set_exception_subject('daemon', DaemonDescription(
daemon_type=daemon_type,
daemon_id=daemon_id
return "{} {} from host '{}'".format(action, name, daemon_spec.host)
@trivial_completion
- def daemon_action(self, action, daemon_type, daemon_id):
+ def daemon_action(self, action, daemon_type, daemon_id) -> List[str]:
args = []
for host, dm in self.cache.daemons.items():
for name, d in dm.items():
return self._remove_daemons(args)
@trivial_completion
- def remove_service(self, service_name):
+ def remove_service(self, service_name) -> str:
self.log.info('Remove service %s' % service_name)
self._trigger_preview_refresh(service_name=service_name)
found = self.spec_store.rm(service_name)
if found:
self._kick_serve_loop()
- return ['Removed service %s' % service_name]
+ return 'Removed service %s' % service_name
else:
# must be idempotent: still a success.
- return [f'Failed to remove service. <{service_name}> was not found.']
+ return f'Failed to remove service. <{service_name}> was not found.'
@trivial_completion
- def get_inventory(self, host_filter=None, refresh=False):
+ def get_inventory(self, host_filter=None, refresh=False) -> List[orchestrator.InventoryHost]:
"""
Return the storage inventory of hosts matching the given filter.
return result
@trivial_completion
- def zap_device(self, host, path):
+ def zap_device(self, host, path) -> str:
self.log.info('Zap device %s:%s' % (host, path))
out, err, code = self._run_cephadm(
host, 'osd', 'ceph-volume',
return '\n'.join(out + err)
@trivial_completion
- def blink_device_light(self, ident_fault, on, locs):
+ def blink_device_light(self, ident_fault, on, locs) -> List[str]:
@forall_hosts
def blink(host, dev, path):
cmd = [
self.cache.osdspec_previews_refresh_queue.append(host)
@trivial_completion
- def apply_drivegroups(self, specs: List[DriveGroupSpec]):
+ def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> List[str]:
"""
Deprecated. Please use `apply()` instead.
return [self._apply(spec) for spec in specs]
@trivial_completion
- def create_osds(self, drive_group: DriveGroupSpec):
+ def create_osds(self, drive_group: DriveGroupSpec) -> str:
return self.osd_service.create_from_spec(drive_group)
def _preview_osdspecs(self,
return create_func_map(args)
@trivial_completion
- def apply_mon(self, spec):
+ def apply_mon(self, spec) -> str:
return self._apply(spec)
@trivial_completion
}
@trivial_completion
- def plan(self, specs: List[GenericSpec]):
+ def plan(self, specs: List[GenericSpec]) -> List:
results = [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n'
'to the current inventory setup. If any on these conditions changes, the \n'
'preview will be invalid. Please make sure to have a minimal \n'
return "Scheduled %s update..." % spec.service_name()
@trivial_completion
- def apply(self, specs: List[GenericSpec]):
+ def apply(self, specs: List[GenericSpec]) -> List[str]:
results = []
for spec in specs:
results.append(self._apply(spec))
return results
@trivial_completion
- def apply_mgr(self, spec):
+ def apply_mgr(self, spec) -> str:
return self._apply(spec)
@trivial_completion
- def add_mds(self, spec: ServiceSpec):
+ def add_mds(self, spec: ServiceSpec) -> List[str]:
return self._add_daemon('mds', spec, self.mds_service.create, self.mds_service.config)
@trivial_completion
- def apply_mds(self, spec: ServiceSpec):
+ def apply_mds(self, spec: ServiceSpec) -> str:
return self._apply(spec)
@trivial_completion
- def add_rgw(self, spec):
+ def add_rgw(self, spec) -> List[str]:
return self._add_daemon('rgw', spec, self.rgw_service.create, self.rgw_service.config)
@trivial_completion
- def apply_rgw(self, spec):
+ def apply_rgw(self, spec) -> str:
return self._apply(spec)
@trivial_completion
return self._add_daemon('iscsi', spec, self.iscsi_service.create, self.iscsi_service.config)
@trivial_completion
- def apply_iscsi(self, spec):
+ def apply_iscsi(self, spec) -> str:
return self._apply(spec)
@trivial_completion
- def add_rbd_mirror(self, spec):
+ def add_rbd_mirror(self, spec) -> List[str]:
return self._add_daemon('rbd-mirror', spec, self.rbd_mirror_service.create)
@trivial_completion
- def apply_rbd_mirror(self, spec):
+ def apply_rbd_mirror(self, spec) -> str:
return self._apply(spec)
@trivial_completion
- def add_nfs(self, spec):
+ def add_nfs(self, spec) -> List[str]:
return self._add_daemon('nfs', spec, self.nfs_service.create, self.nfs_service.config)
@trivial_completion
- def apply_nfs(self, spec):
+ def apply_nfs(self, spec) -> str:
return self._apply(spec)
def _get_dashboard_url(self):
return self.get('mgr_map').get('services', {}).get('dashboard', '')
@trivial_completion
- def add_prometheus(self, spec):
+ def add_prometheus(self, spec) -> List[str]:
return self._add_daemon('prometheus', spec, self.prometheus_service.create)
@trivial_completion
- def apply_prometheus(self, spec):
+ def apply_prometheus(self, spec) -> str:
return self._apply(spec)
@trivial_completion
self.node_exporter_service.create)
@trivial_completion
- def apply_node_exporter(self, spec):
+ def apply_node_exporter(self, spec) -> str:
return self._apply(spec)
@trivial_completion
self.crash_service.create)
@trivial_completion
- def apply_crash(self, spec):
+ def apply_crash(self, spec) -> str:
return self._apply(spec)
@trivial_completion
return self._add_daemon('grafana', spec, self.grafana_service.create)
@trivial_completion
- def apply_grafana(self, spec: ServiceSpec):
+ def apply_grafana(self, spec: ServiceSpec) -> str:
return self._apply(spec)
@trivial_completion
return self._add_daemon('alertmanager', spec, self.alertmanager_service.create)
@trivial_completion
- def apply_alertmanager(self, spec: ServiceSpec):
+ def apply_alertmanager(self, spec: ServiceSpec) -> str:
return self._apply(spec)
def _get_container_image_id(self, image_name):
return image_id, ceph_version
@trivial_completion
- def upgrade_check(self, image, version):
+ def upgrade_check(self, image, version) -> str:
if version:
target_name = self.container_image_base + ':v' + version
elif image:
return json.dumps(r, indent=4, sort_keys=True)
@trivial_completion
- def upgrade_status(self):
+ def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
return self.upgrade.upgrade_status()
@trivial_completion
- def upgrade_start(self, image, version):
+ def upgrade_start(self, image, version) -> str:
return self.upgrade.upgrade_start(image, version)
@trivial_completion
- def upgrade_pause(self):
+ def upgrade_pause(self) -> str:
return self.upgrade.upgrade_pause()
@trivial_completion
- def upgrade_resume(self):
+ def upgrade_resume(self) -> str:
return self.upgrade.upgrade_resume()
@trivial_completion
- def upgrade_stop(self):
+ def upgrade_stop(self) -> str:
return self.upgrade.upgrade_stop()
@trivial_completion
def remove_osds(self, osd_ids: List[str],
replace: bool = False,
- force: bool = False):
+ force: bool = False) -> str:
"""
Takes a list of OSDs and schedules them for removal.
The function that takes care of the actual removal is
return "Scheduled OSD(s) for removal"
@trivial_completion
- def remove_osds_status(self):
+ def remove_osds_status(self) -> Set[OSDRemoval]:
"""
The CLI call to retrieve an osd removal report
"""
DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
+T = TypeVar('T')
class OrchestratorError(Exception):
"""
self.progress = 1
-class Completion(_Promise):
+class Completion(_Promise, Generic[T]):
"""
Combines multiple promises into one overall operation.
)
)
- def fail(self, e):
+ def fail(self, e: Exception):
super(Completion, self).fail(e)
if self._progress_reference:
self._progress_reference.fail()
- def finalize(self, result=_Promise.NO_RESULT):
+ def finalize(self, result: Union[None, object, T]=_Promise.NO_RESULT):
if self._first_promise._state == self.INITIALIZED:
self._first_promise._finalize(result)
@property
- def result(self):
+ def result(self) -> T:
"""
The result of the operation that we were waited
for. Only valid after calling Orchestrator.process() on this
"""
last = self._last_promise()
assert last._state == _Promise.FINISHED
- return last._value
+ return cast(T, last._value)
- def result_str(self):
+ def result_str(self) -> str:
"""Force a string."""
if self.result is None:
return ''
raise e
-class TrivialReadCompletion(Completion):
+class TrivialReadCompletion(Completion[T]):
"""
This is the trivial completion simply wrapping a result.
"""
- def __init__(self, result):
+ def __init__(self, result: T):
super(TrivialReadCompletion, self).__init__()
if result:
self.finalize(result)
raise NotImplementedError()
def add_host(self, host_spec):
- # type: (HostSpec) -> Completion
+ # type: (HostSpec) -> Completion[str]
"""
Add a host to the orchestrator inventory.
raise NotImplementedError()
def remove_host(self, host):
- # type: (str) -> Completion
+ # type: (str) -> Completion[str]
"""
Remove a host from the orchestrator inventory.
raise NotImplementedError()
def update_host_addr(self, host, addr):
- # type: (str, str) -> Completion
+ # type: (str, str) -> Completion[str]
"""
Update a host's address
raise NotImplementedError()
def get_hosts(self):
- # type: () -> Completion
+ # type: () -> Completion[List[HostSpec]]
"""
Report the hosts in the cluster.
raise NotImplementedError()
def add_host_label(self, host, label):
- # type: (str, str) -> Completion
+ # type: (str, str) -> Completion[str]
"""
Add a host label
"""
raise NotImplementedError()
def remove_host_label(self, host, label):
- # type: (str, str) -> Completion
+ # type: (str, str) -> Completion[str]
"""
Remove a host label
"""
raise NotImplementedError()
def get_inventory(self, host_filter=None, refresh=False):
- # type: (Optional[InventoryFilter], bool) -> Completion
+ # type: (Optional[InventoryFilter], bool) -> Completion[List[InventoryHost]]
"""
Returns something that was created by `ceph-volume inventory`.
raise NotImplementedError()
def describe_service(self, service_type=None, service_name=None, refresh=False):
- # type: (Optional[str], Optional[str], bool) -> Completion
+ # type: (Optional[str], Optional[str], bool) -> Completion[List[ServiceDescription]]
"""
Describe a service (of any kind) that is already configured in
the orchestrator. For example, when viewing an OSD in the dashboard
raise NotImplementedError()
def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None, refresh=False):
- # type: (Optional[str], Optional[str], Optional[str], Optional[str], bool) -> Completion
+ # type: (Optional[str], Optional[str], Optional[str], Optional[str], bool) -> Completion[List[DaemonDescription]]
"""
Describe a daemon (of any kind) that is already configured in
the orchestrator.
"""
raise NotImplementedError()
- def apply(self, specs: List["GenericSpec"]) -> Completion:
+ def apply(self, specs: List["GenericSpec"]) -> Completion[List[str]]:
"""
Applies any spec
"""
completion = completion.then(next)
return completion
- def plan(self, spec: List["GenericSpec"]):
+ def plan(self, spec: List["GenericSpec"]) -> Completion[List]:
"""
Plan (Dry-run, Preview) a List of Specs.
"""
raise NotImplementedError()
def remove_daemons(self, names):
- # type: (List[str]) -> Completion
+ # type: (List[str]) -> Completion[List[str]]
"""
Remove specific daemon(s).
raise NotImplementedError()
def remove_service(self, service_name):
- # type: (str) -> Completion
+ # type: (str) -> Completion[str]
"""
Remove a service (a collection of daemons).
raise NotImplementedError()
def service_action(self, action, service_name):
- # type: (str, str) -> Completion
+ # type: (str, str) -> Completion[List[str]]
"""
Perform an action (start/stop/reload) on a service (i.e., all daemons
providing the logical service).
raise NotImplementedError()
def daemon_action(self, action, daemon_type, daemon_id):
- # type: (str, str, str) -> Completion
+ # type: (str, str, str) -> Completion[List[str]]
"""
Perform an action (start/stop/reload) on a daemon.
raise NotImplementedError()
def create_osds(self, drive_group):
- # type: (DriveGroupSpec) -> Completion
+ # type: (DriveGroupSpec) -> Completion[str]
"""
Create one or more OSDs within a single Drive Group.
"""
raise NotImplementedError()
- def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> Completion:
+ def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> Completion[List[str]]:
""" Update OSD cluster """
raise NotImplementedError()
def preview_osdspecs(self,
osdspec_name: Optional[str] = 'osd',
osdspecs: Optional[List[DriveGroupSpec]] = None
- ) -> Completion:
+ ) -> Completion[str]:
""" Get a preview for OSD deployments """
raise NotImplementedError()
def remove_osds(self, osd_ids: List[str],
replace: bool = False,
- force: bool = False) -> Completion:
+ force: bool = False) -> Completion[str]:
"""
:param osd_ids: list of OSD IDs
:param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
raise NotImplementedError()
def blink_device_light(self, ident_fault, on, locations):
- # type: (str, bool, List[DeviceLightLoc]) -> Completion
+ # type: (str, bool, List[DeviceLightLoc]) -> Completion[List[str]]
"""
Instructs the orchestrator to enable or disable either the ident or the fault LED.
raise NotImplementedError()
def zap_device(self, host, path):
- # type: (str, str) -> Completion
+ # type: (str, str) -> Completion[str]
"""Zap/Erase a device (DESTROYS DATA)"""
raise NotImplementedError()
def add_mon(self, spec):
- # type: (ServiceSpec) -> Completion
+ # type: (ServiceSpec) -> Completion[List[str]]
"""Create mon daemon(s)"""
raise NotImplementedError()
def apply_mon(self, spec):
- # type: (ServiceSpec) -> Completion
+ # type: (ServiceSpec) -> Completion[str]
"""Update mon cluster"""
raise NotImplementedError()
def add_mgr(self, spec):
- # type: (ServiceSpec) -> Completion
+ # type: (ServiceSpec) -> Completion[List[str]]
"""Create mgr daemon(s)"""
raise NotImplementedError()
def apply_mgr(self, spec):
- # type: (ServiceSpec) -> Completion
+ # type: (ServiceSpec) -> Completion[str]
"""Update mgr cluster"""
raise NotImplementedError()
def add_mds(self, spec):
- # type: (ServiceSpec) -> Completion
+ # type: (ServiceSpec) -> Completion[List[str]]
"""Create MDS daemon(s)"""
raise NotImplementedError()
def apply_mds(self, spec):
- # type: (ServiceSpec) -> Completion
+ # type: (ServiceSpec) -> Completion[str]
"""Update MDS cluster"""
raise NotImplementedError()
def add_rgw(self, spec):
- # type: (RGWSpec) -> Completion
+ # type: (RGWSpec) -> Completion[List[str]]
"""Create RGW daemon(s)"""
raise NotImplementedError()
def apply_rgw(self, spec):
- # type: (RGWSpec) -> Completion
+ # type: (RGWSpec) -> Completion[str]
"""Update RGW cluster"""
raise NotImplementedError()
def add_rbd_mirror(self, spec):
- # type: (ServiceSpec) -> Completion
+ # type: (ServiceSpec) -> Completion[List[str]]
"""Create rbd-mirror daemon(s)"""
raise NotImplementedError()
def apply_rbd_mirror(self, spec):
- # type: (ServiceSpec) -> Completion
+ # type: (ServiceSpec) -> Completion[str]
"""Update rbd-mirror cluster"""
raise NotImplementedError()
def add_nfs(self, spec):
- # type: (NFSServiceSpec) -> Completion
+ # type: (NFSServiceSpec) -> Completion[List[str]]
"""Create NFS daemon(s)"""
raise NotImplementedError()
def apply_nfs(self, spec):
- # type: (NFSServiceSpec) -> Completion
+ # type: (NFSServiceSpec) -> Completion[str]
"""Update NFS cluster"""
raise NotImplementedError()
def add_iscsi(self, spec):
- # type: (IscsiServiceSpec) -> Completion
+ # type: (IscsiServiceSpec) -> Completion[List[str]]
"""Create iscsi daemon(s)"""
raise NotImplementedError()
def apply_iscsi(self, spec):
- # type: (IscsiServiceSpec) -> Completion
+ # type: (IscsiServiceSpec) -> Completion[str]
"""Update iscsi cluster"""
raise NotImplementedError()
def add_prometheus(self, spec):
- # type: (ServiceSpec) -> Completion
+ # type: (ServiceSpec) -> Completion[List[str]]
"""Create new prometheus daemon"""
raise NotImplementedError()
def apply_prometheus(self, spec):
- # type: (ServiceSpec) -> Completion
+ # type: (ServiceSpec) -> Completion[str]
"""Update prometheus cluster"""
raise NotImplementedError()
def add_node_exporter(self, spec):
- # type: (ServiceSpec) -> Completion
+ # type: (ServiceSpec) -> Completion[List[str]]
"""Create a new Node-Exporter service"""
raise NotImplementedError()
def apply_node_exporter(self, spec):
- # type: (ServiceSpec) -> Completion
+ # type: (ServiceSpec) -> Completion[str]
"""Update existing a Node-Exporter daemon(s)"""
raise NotImplementedError()
def add_crash(self, spec):
- # type: (ServiceSpec) -> Completion
+ # type: (ServiceSpec) -> Completion[List[str]]
"""Create a new crash service"""
raise NotImplementedError()
def apply_crash(self, spec):
- # type: (ServiceSpec) -> Completion
+ # type: (ServiceSpec) -> Completion[str]
"""Update existing a crash daemon(s)"""
raise NotImplementedError()
def add_grafana(self, spec):
- # type: (ServiceSpec) -> Completion
+ # type: (ServiceSpec) -> Completion[List[str]]
"""Create a new Node-Exporter service"""
raise NotImplementedError()
def apply_grafana(self, spec):
- # type: (ServiceSpec) -> Completion
+ # type: (ServiceSpec) -> Completion[str]
"""Update existing a Node-Exporter daemon(s)"""
raise NotImplementedError()
def add_alertmanager(self, spec):
- # type: (ServiceSpec) -> Completion
+ # type: (ServiceSpec) -> Completion[List[str]]
"""Create a new AlertManager service"""
raise NotImplementedError()
def apply_alertmanager(self, spec):
- # type: (ServiceSpec) -> Completion
+ # type: (ServiceSpec) -> Completion[str]
"""Update an existing AlertManager daemon(s)"""
raise NotImplementedError()
def upgrade_check(self, image, version):
- # type: (Optional[str], Optional[str]) -> Completion
+ # type: (Optional[str], Optional[str]) -> Completion[str]
raise NotImplementedError()
def upgrade_start(self, image, version):
- # type: (Optional[str], Optional[str]) -> Completion
+ # type: (Optional[str], Optional[str]) -> Completion[str]
raise NotImplementedError()
def upgrade_pause(self):
- # type: () -> Completion
+ # type: () -> Completion[str]
raise NotImplementedError()
def upgrade_resume(self):
- # type: () -> Completion
+ # type: () -> Completion[str]
raise NotImplementedError()
def upgrade_stop(self):
- # type: () -> Completion
+ # type: () -> Completion[str]
raise NotImplementedError()
def upgrade_status(self):
- # type: () -> Completion
+ # type: () -> Completion[UpgradeStatusSpec]
"""
If an upgrade is currently underway, report on where
we are in the process, or if some error has occurred.