"""
import time
+try:
+ from typing import TypeVar, Generic, List
+ T = TypeVar('T')
+ G = Generic[T]
+except ImportError:
+ T, G = object, object
-class _Completion(object):
+
+class _Completion(G):
@property
def result(self):
+ # type: () -> T
"""
Return the result of the operation that we were waited
for. Only valid after calling Orchestrator.wait() on this
@property
def is_read(self):
+ # type: () -> bool
raise NotImplementedError()
@property
def is_complete(self):
+ # type: () -> bool
raise NotImplementedError()
@property
def is_errored(self):
+ # type: () -> bool
+ raise NotImplementedError()
+
+ @property
+ def should_wait(self):
+ # type: () -> bool
raise NotImplementedError()
"""
return not self.is_complete
+
class WriteCompletion(_Completion):
"""
``Orchestrator`` implementations should inherit from this
@property
def is_persistent(self):
+ # type: () -> bool
"""
Has the operation updated the orchestrator's configuration
persistently? Typically this would indicate that an update
"""
return not self.is_persistent
+
class Orchestrator(object):
"""
Calls in this class may do long running remote operations, with time
raise NotImplementedError()
def get_inventory(self, node_filter=None):
+ # type: (InventoryFilter) -> ReadCompletion[List[InventoryNode]]
"""
+ Returns something that was created by `ceph-volume inventory`.
- :param node_filter:
:return: list of InventoryNode
"""
raise NotImplementedError()
def describe_service(self, service_type=None, service_id=None, node_name=None):
+ # type: (str, str, str) -> ReadCompletion[List[ServiceDescription]]
"""
Describe a service (of any kind) that is already configured in
the orchestrator. For example, when viewing an OSD in the dashboard
raise NotImplementedError()
def create_osds(self, osd_spec):
+ # type: (OsdCreationSpec) -> WriteCompletion
"""
Create one or more OSDs within a single Drive Group.
raise NotImplementedError()
def replace_osds(self, osd_spec):
+ # type: (OsdCreationSpec) -> WriteCompletion
"""
Like create_osds, but the osd_id_claims must be fully
populated.
raise NotImplementedError()
def remove_osds(self, node, osd_ids):
+ # type: (str, List[str]) -> WriteCompletion
"""
:param node: A node name, must exist.
:param osd_ids: list of OSD IDs
raise NotImplementedError()
def add_stateless_service(self, service_type, spec):
- assert isinstance(spec, StatelessServiceSpec)
+ # type: (str, StatelessServiceSpec) -> WriteCompletion
+ """
+ Installing and adding a completely new service to the cluster.
+
+ This is not about starting services.
+ """
raise NotImplementedError()
def update_stateless_service(self, service_type, id_, spec):
- assert isinstance(spec, StatelessServiceSpec)
+ # type: (str, str, StatelessServiceSpec) -> WriteCompletion
+ """
+ This is about changing / redeploying existing services. Like for
+ example changing the number of service instances.
+
+ :rtype: WriteCompletion
+ """
raise NotImplementedError()
def remove_stateless_service(self, service_type, id_):
+ # type: (str, str) -> WriteCompletion
+ """
+ Uninstalls an existing service from the cluster.
+
+ This is not about stopping services.
+ """
raise NotImplementedError()
def add_mon(self, node_name):
+ # type: (str) -> WriteCompletion
"""
We operate on a node rather than a particular device: it is
assumed/expected that proper SSD storage is already available
and accessible in /var.
:param node_name:
- :return:
"""
raise NotImplementedError()
def remove_mon(self, node_name):
+ # type: (str) -> WriteCompletion
"""
-
- :param node_name:
- :return:
+ :param node_name: Remove MON from that host.
"""
raise NotImplementedError()
def upgrade_start(self, upgrade_spec):
- assert isinstance(upgrade_spec, UpgradeSpec)
+ # type: (UpgradeSpec) -> WriteCompletion
raise NotImplementedError()
def upgrade_status(self):
+ # type: () -> ReadCompletion[UpgradeStatusSpec]
"""
If an upgrade is currently underway, report on where
we are in the process, or if some error has occurred.
raise NotImplementedError()
def upgrade_available(self):
+ # type: () -> ReadCompletion[List[str]]
"""
Report on what versions are available to upgrade to
'status': self.status,
'status_desc': self.status_desc,
}
- return {k:v for (k,v) in out.items() if v is not None}
+ return {k: v for (k, v) in out.items() if v is not None}
class DriveGroupSpec(object):
self.devices = devices # type: List[InventoryDevice]
def to_json(self):
- return {'name': self.name, 'devices': [d.to_json() for d in self.devices]}
+ return {'name': self.name, 'devices': [d.to_json() for d in self.devices]}
def _mk_orch_methods(cls):