Please see the ceph-mgr module developer's guide for more information.
"""
+import functools
+import logging
import sys
import time
from collections import namedtuple
try:
from ceph.deployment.drive_group import DriveGroupSpec
- from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator
+ from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, Type
- T = TypeVar('T')
- G = Generic[T]
except ImportError:
- T, G = object, object
+ pass
+ #T, G = object, object
+
+T = TypeVar('T')
+U = TypeVar('U')
+V = TypeVar('V')
+G = Generic[T]
+Promises = TypeVar('Promises', bound='_Promise')
+Completions = TypeVar('Completions', bound='Completion')
+
def parse_host_specs(host, require_network=True):
return CLICommand(prefix, cmd_args, desc, perm)(wrapper)
+
def _cli_command(perm):
def inner_cli_command(prefix, cmd_args="", desc=""):
return lambda func: handle_exception(prefix, cmd_args, desc, perm, func)
return inner_cli_command
+
_cli_read_command = _cli_command('r')
_cli_write_command = _cli_command('rw')
-class _Completion(G):
+def _no_result():
+ return object()
+
+
+class _Promise(Generic[T]):
+ """
+ A completion may need multiple promises to be fulfilled. `_Promise` is one
+ step.
+
+ Typically ``Orchestrator`` implementations inherit from this class to
+ build their own way of finishing a step to fulfil a future.
+
+ They are not exposed in the orchestrator interface and can be seen as a
+ helper to build orchestrator modules.
+ """
+ INITIALIZED = 1 # We have a parent completion and a next completion
+ FINISHED = 2 # we have a final result
+
+ NO_RESULT = _no_result() # type: None
+
+ def __init__(self,
+ _first_promise=None, # type: Optional["_Promise[V]"]
+ value=NO_RESULT, # type: Optional[T]
+ on_complete=None # type: Optional[Callable[[T], Union[U, _Promise[U]]]]
+ ):
+ self._on_complete = on_complete
+ self._next_promise = None # type: Optional[_Promise[U]]
+
+ self._state = self.INITIALIZED
+ self._exception = None # type: Optional[Exception]
+
+ # Value of this _Promise. may be an intermediate result.
+ self._value = value
+
+ # _Promise is not a continuation monad, as `_result` is of type
+ # T instead of (T -> r) -> r. Therefore we need to store the first promise here.
+ self._first_promise = _first_promise or self # type: 'Completion'
+
+ def __repr__(self):
+ name = getattr(self._on_complete, '__name__', '??') if self._on_complete else 'None'
+ val = repr(self._value) if self._value is not self.NO_RESULT else 'NA'
+ return '{}(_s={}, val={}, id={}, name={}, pr={}, _next={})'.format(
+ self.__class__, self._state, val, id(self), name, getattr(next, '_progress_reference', 'NA'), repr(self._next_promise)
+ )
+
+ def then(self, on_complete):
+ # type: (Promises, Callable[[T], Union[U, _Promise[U]]]) -> Promises[U]
+ """
+ Call ``on_complete`` as soon as this promise is finalized.
+ """
+ assert self._state is self.INITIALIZED
+ if self._on_complete is not None:
+ assert self._next_promise is None
+ self._set_next_promise(self.__class__(
+ _first_promise=self._first_promise,
+ on_complete=on_complete
+ ))
+ return self._next_promise
+
+ else:
+ self._on_complete = on_complete
+ self._set_next_promise(self.__class__(_first_promise=self._first_promise))
+ return self._next_promise
+
+ def _set_next_promise(self, next):
+ # type: (_Promise[U]) -> None
+ assert self is not next
+ assert self._state is self.INITIALIZED
+
+ self._next_promise = next
+ assert self._next_promise is not None
+ for p in iter(self._next_promise):
+ p._first_promise = self._first_promise
+
+ def finalize(self, value=NO_RESULT):
+ # type: (Optional[T]) -> None
+ """
+ Sets this promise to complete.
+
+ Orchestrators may choose to use this helper function.
+
+ :param value: new value.
+ """
+ assert self._state is self.INITIALIZED
+
+ if value is not self.NO_RESULT:
+ self._value = value
+ assert self._value is not self.NO_RESULT
+
+ if self._on_complete:
+ try:
+ next_result = self._on_complete(self._value)
+ except Exception as e:
+ self.fail(e)
+ return
+ else:
+ next_result = self._value
+
+ if isinstance(next_result, _Promise):
+ # hack: _Promise is not a continuation monad.
+ next_result = next_result._first_promise # type: ignore
+ assert next_result not in self, repr(self._first_promise) + repr(next_result)
+ assert self not in next_result
+ next_result._append_promise(self._next_promise)
+ self._set_next_promise(next_result)
+ if self._next_promise._value is self.NO_RESULT:
+ self._next_promise._value = self._value
+ else:
+ # simple map. simply forward
+ if self._next_promise:
+ self._next_promise._value = next_result
+ else:
+ # Hack: next_result is of type U, _value is of type T
+ self._value = next_result # type: ignore
+ self._state = self.FINISHED
+ logger.debug('finalized {}'.format(repr(self)))
+ self.propagate_to_next()
+
+ def propagate_to_next(self):
+ assert self._state is self.FINISHED
+ if self._next_promise:
+ self._next_promise.finalize()
+
+ def fail(self, e):
+ # type: (Exception) -> None
+ """
+ Sets the whole completion to be faild with this exception and end the
+ evaluation.
+ """
+ assert self._state is self.INITIALIZED
+ logger.exception('_Promise failed')
+ self._exception = e
+ self._value = 'exception'
+ if self._next_promise:
+ self._next_promise.fail(e)
+ self._state = self.FINISHED
+
+ def __contains__(self, item):
+ return any(item is p for p in iter(self._first_promise))
+
+ def __iter__(self):
+ yield self
+ elem = self._next_promise
+ while elem is not None:
+ yield elem
+ elem = elem._next_promise
+
+ def _append_promise(self, other):
+ if other is not None:
+ assert self not in other
+ assert other not in self
+ self._last_promise()._set_next_promise(other)
+
+ def _last_promise(self):
+ # type: () -> _Promise
+ return list(iter(self))[-1]
+
+
+class ProgressReference(object):
+ def __init__(self,
+ message, # type: str
+ mgr,
+ completion=None # type: Optional[Callable[[], Completion[float]]]
+ ):
+ """
+ ProgressReference can be used within Completions:
+
+ +---------------+ +---------------------------------+
+ | | then | |
+ | My Completion | +--> | on_complete=ProgressReference() |
+ | | | |
+ +---------------+ +---------------------------------+
+
+ """
+ super(ProgressReference, self).__init__()
+ self.progress_id = str(uuid.uuid4())
+ self.message = message
+ self.mgr = mgr
+
+ #: The completion can already have a result, before the write
+ #: operation is effective. progress == 1 means, the services are
+ #: created / removed.
+ self.completion = completion # type: Optional[Callable[[], Completion[float]]]
+
+ #: if a orchestrator module can provide a more detailed
+ #: progress information, it needs to also call ``progress.update()``.
+ self.progress = 0.0
+
+ self._completion_has_result = False
+ self.mgr.all_progress_references.append(self)
+
+ def __str__(self):
+ """
+ ``__str__()`` is used for determining the message for progress events.
+ """
+ return self.message or super(ProgressReference, self).__str__()
+
+ def __call__(self, arg):
+ self._completion_has_result = True
+ if self.progress == 0.0:
+ self.progress = 0.5
+ return arg
+
+ @property
+ def progress(self):
+ return self._progress
+
+ @progress.setter
+ def progress(self, progress):
+ self._progress = progress
+ try:
+ if self.effective:
+ self.mgr.remote("progress", "complete", self.progress_id)
+ self.mgr.all_progress_references = [p for p in self.mgr.all_progress_references if p is not self]
+ else:
+ self.mgr.remote("progress", "update", self.progress_id, self.message,
+ progress,
+ [("origin", "orchestrator")])
+ except ImportError:
+ # If the progress module is disabled that's fine,
+ # they just won't see the output.
+ pass
+
+ @property
+ def effective(self):
+ return self.progress == 1 and self._completion_has_result
+
+ def update(self):
+ def run(progress):
+ self.progress = progress
+ if self.completion:
+ c = self.completion().then(run)
+ self.mgr.process([c._first_promise])
+ else:
+ self.progress = 1
+
+ def fail(self):
+ self._completion_has_result = True
+ self.progress = 1
+
+class Completion(_Promise[T]):
+ """
+ Combines multiple promises into one overall operation.
+
+ :ivar exception: Holds an exception object, if the completion errored.
+
+ """
+ def __init__(self,
+ _first_promise=None, # type: Optional["Completion[V]"]
+ value=_Promise.NO_RESULT, # type: Optional[T]
+ on_complete=None # type: Optional[Callable[[T], Union[U, Completion[U]]]]
+ ):
+ super(Completion, self).__init__(_first_promise, value, on_complete)
+
+ @property
+ def _progress_reference(self):
+ # type: () -> Optional[ProgressReference]
+ if hasattr(self._on_complete, 'progress_id'):
+ return self._on_complete
+ return None
+
+ @property
+ def progress_reference(self):
+ # type: () -> Optional[ProgressReference]
+ """
+ ProgressReference. Marks this completion
+ as a write completeion.
+ """
+
+ references = [c._progress_reference for c in iter(self) if c._progress_reference is not None]
+ if references:
+ assert len(references) == 1
+ return references[0]
+ return None
+
+ @classmethod
+ def with_progress(cls, # type: Completions[T]
+ message, # type: str
+ mgr,
+ _first_promise=None, # type: Optional["Completions[V]"]
+ value=_Promise.NO_RESULT, # type: Optional[T]
+ on_complete=None, # type: Optional[Callable[[T], Union[U, Completions[U]]]]
+ calc_percent=None # type: Optional[Callable[[], Completions[float]]]
+ ):
+ # type: (...) -> Completions[T]
+
+ c = cls(
+ _first_promise=_first_promise,
+ value=value,
+ on_complete=on_complete
+ ).then(
+ on_complete=ProgressReference(
+ message=message,
+ mgr=mgr,
+ completion=calc_percent
+ )
+ )
+ return c._first_promise
+
+ def fail(self, e):
+ super(Completion, self).fail(e)
+ if self._progress_reference:
+ self._progress_reference.fail()
+
@property
def result(self):
- # type: () -> T
"""
- Return the result of the operation that we were waited
- for. Only valid after calling Orchestrator.wait() on this
+ The result of the operation that we were waited
+ for. Only valid after calling Orchestrator.process() on this
completion.
"""
- raise NotImplementedError()
+ last = self._last_promise()
+ assert last._state == _Promise.FINISHED
+ return last._value
def result_str(self):
+ """Force a string."""
if self.result is None:
return ''
return str(self.result)
@property
def exception(self):
# type: () -> Optional[Exception]
- """
- Holds an exception object.
- """
- try:
- return self.__exception
- except AttributeError:
- return None
-
- @exception.setter
- def exception(self, value):
- self.__exception = value
+ return self._last_promise()._exception
@property
- def is_read(self):
+ def has_result(self):
# type: () -> bool
- raise NotImplementedError()
+ """
+ Has the operation already a result?
- @property
- def is_complete(self):
- # type: () -> bool
- raise NotImplementedError()
+ For Write operations, it can already have a
+ result, if the orchestrator's configuration is
+ persistently written. Typically this would
+ indicate that an update had been written to
+ a manifest, but that the update had not
+ necessarily been pushed out to the cluster.
+
+ :return:
+ """
+ return self._last_promise()._state == _Promise.FINISHED
@property
def is_errored(self):
return self.exception is not None
@property
- def should_wait(self):
+ def needs_result(self):
# type: () -> bool
- raise NotImplementedError()
+ """
+ Could the external operation be deemed as complete,
+ or should we wait?
+ We must wait for a read operation only if it is not complete.
+ """
+ return not self.is_errored and not self.has_result
+
+ @property
+ def is_finished(self):
+ # type: () -> bool
+ """
+ Could the external operation be deemed as complete,
+ or should we wait?
+ We must wait for a read operation only if it is not complete.
+ """
+ return self.is_errored or (self.has_result)
def raise_if_exception(c):
- # type: (_Completion) -> None
+ # type: (Completion) -> None
"""
:raises OrchestratorError: Some user error or a config error.
:raises Exception: Some internal error
raise e
-class ReadCompletion(_Completion):
- """
- ``Orchestrator`` implementations should inherit from this
- class to implement their own handles to operations in progress, and
- return an instance of their subclass from calls into methods.
- """
-
- def __init__(self):
- pass
-
- @property
- def is_read(self):
- return True
-
- @property
- def should_wait(self):
- """Could the external operation be deemed as complete,
- or should we wait?
- We must wait for a read operation only if it is not complete.
- """
- return not self.is_complete
-
-
-class TrivialReadCompletion(ReadCompletion):
+class TrivialReadCompletion(Completion[T]):
"""
This is the trivial completion simply wrapping a result.
"""
super(TrivialReadCompletion, self).__init__()
self._result = result
- @property
- def result(self):
- return self._result
-
- @property
- def is_complete(self):
- return True
-
-
-class WriteCompletion(_Completion):
- """
- ``Orchestrator`` implementations should inherit from this
- class to implement their own handles to operations in progress, and
- return an instance of their subclass from calls into methods.
- """
-
- def __init__(self):
- self.progress_id = str(uuid.uuid4())
-
- #: if a orchestrator module can provide a more detailed
- #: progress information, it needs to also call ``progress.update()``.
- self.progress = 0.5
-
- def __str__(self):
- """
- ``__str__()`` is used for determining the message for progress events.
- """
- return super(WriteCompletion, self).__str__()
-
- @property
- def is_persistent(self):
- # type: () -> bool
- """
- Has the operation updated the orchestrator's configuration
- persistently? Typically this would indicate that an update
- had been written to a manifest, but that the update
- had not necessarily been pushed out to the cluster.
- """
- raise NotImplementedError()
-
- @property
- def is_effective(self):
- """
- Has the operation taken effect on the cluster? For example,
- if we were adding a service, has it come up and appeared
- in Ceph's cluster maps?
- """
- raise NotImplementedError()
-
- @property
- def is_complete(self):
- return self.is_errored or (self.is_persistent and self.is_effective)
-
- @property
- def is_read(self):
- return False
-
- @property
- def should_wait(self):
- """Could the external operation be deemed as complete,
- or should we wait?
- We must wait for a write operation only if we know
- it is not persistent yet.
- """
- return not self.is_persistent
-
def _hide_in_features(f):
f._hide_in_features = True
@_hide_in_features
def process(self, completions):
- # type: (List[_Completion]) -> None
+ # type: (List[Completion]) -> None
"""
Given a list of Completion instances, process any which are
incomplete.
return features
def add_host(self, host):
- # type: (str) -> WriteCompletion
+ # type: (str) -> Completion
"""
Add a host to the orchestrator inventory.
raise NotImplementedError()
def remove_host(self, host):
- # type: (str) -> WriteCompletion
+ # type: (str) -> Completion
"""
Remove a host from the orchestrator inventory.
raise NotImplementedError()
def get_hosts(self):
- # type: () -> ReadCompletion[List[InventoryNode]]
+ # type: () -> Completion[List[InventoryNode]]
"""
Report the hosts in the cluster.
return NotImplementedError()
def get_inventory(self, node_filter=None, refresh=False):
- # type: (InventoryFilter, bool) -> ReadCompletion[List[InventoryNode]]
+ # type: (InventoryFilter, bool) -> Completion[List[InventoryNode]]
"""
Returns something that was created by `ceph-volume inventory`.
raise NotImplementedError()
def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False):
- # type: (Optional[str], Optional[str], Optional[str], bool) -> ReadCompletion[List[ServiceDescription]]
+ # type: (Optional[str], Optional[str], Optional[str], bool) -> Completion[List[ServiceDescription]]
"""
Describe a service (of any kind) that is already configured in
the orchestrator. For example, when viewing an OSD in the dashboard
raise NotImplementedError()
def service_action(self, action, service_type, service_name=None, service_id=None):
- # type: (str, str, str, str) -> WriteCompletion
+ # type: (str, str, str, str) -> Completion
"""
Perform an action (start/stop/reload) on a service.
:param service_type: e.g. "mds", "rgw", ...
:param service_name: name of logical service ("cephfs", "us-east", ...)
:param service_id: service daemon instance (usually a short hostname)
- :rtype: WriteCompletion
+ :rtype: Completion
"""
- assert action in ["start", "stop", "reload", "restart", "redeploy"]
- assert service_name or service_id
- assert not (service_name and service_id)
+ #assert action in ["start", "stop", "reload, "restart", "redeploy"]
+ #assert service_name or service_id
+ #assert not (service_name and service_id)
raise NotImplementedError()
- def create_osds(self, drive_group, all_hosts):
- # type: (DriveGroupSpec, List[str]) -> WriteCompletion
+ def create_osds(self, drive_group):
+ # type: (DriveGroupSpec) -> Completion
"""
Create one or more OSDs within a single Drive Group.
"""
raise NotImplementedError()
- def remove_osds(self, osd_ids, destroy=False):
- # type: (List[str], bool) -> WriteCompletion
+ def remove_osds(self, osd_ids):
+ # type: (List[str]) -> Completion
"""
:param osd_ids: list of OSD IDs
:param destroy: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
raise NotImplementedError()
def update_mgrs(self, num, hosts):
- # type: (int, List[str]) -> WriteCompletion
+ # type: (int, List[str]) -> Completion
"""
Update the number of cluster managers.
raise NotImplementedError()
def update_mons(self, num, hosts):
- # type: (int, List[Tuple[str,str]]) -> WriteCompletion
+ # type: (int, List[Tuple[str,str]]) -> Completion
"""
Update the number of cluster monitors.
raise NotImplementedError()
def add_mds(self, spec):
- # type: (StatelessServiceSpec) -> WriteCompletion
+ # type: (StatelessServiceSpec) -> Completion
"""Create a new MDS cluster"""
raise NotImplementedError()
def remove_mds(self, name):
- # type: (str) -> WriteCompletion
+ # type: (str) -> Completion
"""Remove an MDS cluster"""
raise NotImplementedError()
def update_mds(self, spec):
- # type: (StatelessServiceSpec) -> WriteCompletion
+ # type: (StatelessServiceSpec) -> Completion
"""
Update / redeploy existing MDS cluster
Like for example changing the number of service instances.
raise NotImplementedError()
def add_nfs(self, spec):
- # type: (NFSServiceSpec) -> WriteCompletion
+ # type: (NFSServiceSpec) -> Completion
"""Create a new MDS cluster"""
raise NotImplementedError()
def remove_nfs(self, name):
- # type: (str) -> WriteCompletion
+ # type: (str) -> Completion
"""Remove a NFS cluster"""
raise NotImplementedError()
def update_nfs(self, spec):
- # type: (NFSServiceSpec) -> WriteCompletion
+ # type: (NFSServiceSpec) -> Completion
"""
Update / redeploy existing NFS cluster
Like for example changing the number of service instances.
raise NotImplementedError()
def add_rgw(self, spec):
- # type: (RGWSpec) -> WriteCompletion
+ # type: (RGWSpec) -> Completion
"""Create a new MDS zone"""
raise NotImplementedError()
def remove_rgw(self, zone):
- # type: (str) -> WriteCompletion
+ # type: (str) -> Completion
"""Remove a RGW zone"""
raise NotImplementedError()
def update_rgw(self, spec):
- # type: (StatelessServiceSpec) -> WriteCompletion
+ # type: (StatelessServiceSpec) -> Completion
"""
Update / redeploy existing RGW zone
Like for example changing the number of service instances.
@_hide_in_features
def upgrade_start(self, upgrade_spec):
- # type: (UpgradeSpec) -> WriteCompletion
+ # type: (UpgradeSpec) -> Completion
raise NotImplementedError()
@_hide_in_features
def upgrade_status(self):
- # type: () -> ReadCompletion[UpgradeStatusSpec]
+ # type: () -> Completion[UpgradeStatusSpec]
"""
If an upgrade is currently underway, report on where
we are in the process, or if some error has occurred.
@_hide_in_features
def upgrade_available(self):
- # type: () -> ReadCompletion[List[str]]
+ # type: () -> Completion[List[str]]
"""
Report on what versions are available to upgrade to
def __repr__(self):
return "<InventoryNode>({name})".format(name=self.name)
+ @staticmethod
+ def get_host_names(nodes):
+ # type: (List[InventoryNode]) -> List[str]
+ return [node.name for node in nodes]
+
class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev'])):
"""
def shim(method_name):
def inner(self, *args, **kwargs):
completion = self._oremote(method_name, args, kwargs)
- self._update_completion_progress(completion, 0)
return completion
return inner
mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs))
return mgr.remote(o, meth, *args, **kwargs)
- def _update_completion_progress(self, completion, force_progress=None):
- # type: (WriteCompletion, Optional[float]) -> None
- try:
- progress = force_progress if force_progress is not None else completion.progress
- if completion.is_complete:
- self.remote("progress", "complete", completion.progress_id)
- else:
- self.remote("progress", "update", completion.progress_id, str(completion), progress,
- [("origin", "orchestrator")])
- except AttributeError:
- # No WriteCompletion. Ignore.
- pass
- except ImportError:
- # If the progress module is disabled that's fine,
- # they just won't see the output.
- pass
def _orchestrator_wait(self, completions):
- # type: (List[_Completion]) -> None
+ # type: (List[Completion]) -> None
"""
Wait for completions to complete (reads) or
become persistent (writes).
:raises RuntimeError: something went wrong while calling the process method.
:raises ImportError: no `orchestrator_cli` module or backend not found.
"""
- for c in completions:
- self._update_completion_progress(c)
- while any(not c.is_complete for c in completions):
- self.wait(completions)
+ while any(not c.has_result for c in completions):
+ self.process(completions)
self.__get_mgr().log.info("Operations pending: %s",
- sum(1 for c in completions if not c.is_complete))
- if any(c.should_wait for c in completions):
+ sum(1 for c in completions if not c.has_result))
+ if any(c.needs_result for c in completions):
time.sleep(1)
else:
break
- for c in completions:
- self._update_completion_progress(c)
class OutdatableData(object):
from ceph.deployment import inventory
try:
- from typing import List, Dict
+ from typing import List, Dict, Optional, Callable
from ceph.deployment.drive_group import DriveGroupSpec
except ImportError:
pass # just for type checking
from .rook_cluster import RookCluster
-class RookCompletionMixin(object):
- # hacky global
- all_completions = []
-
- def __init__(self, message):
- super(RookCompletionMixin, self).__init__()
- self.message = message
-
- # Result of k8s API call, this is set if executed==True
- self._result = None
-
- all_completions.append(self)
-
-
- @property
- def result(self):
- return self._result
-
- def __str__(self):
- return self.message
-
-
-class RookReadCompletion(RookCompletionMixin, orchestrator.ReadCompletion):
- """
- All reads are simply API calls: avoid spawning
- huge numbers of threads by just running them
- inline when someone calls wait()
- """
-
- def __init__(self, cb):
- super(RookReadCompletion, self).__init__("<read op>")
- self.cb = cb
- self._complete = False
-
- @property
- def has_result(self):
- return self._complete
-
- def execute(self):
- try:
- self._result = self.cb()
- except Exception as e:
- self.exception = e
- finally:
- self._complete = True
-
-
-class RookWriteCompletion(RookCompletionMixin, orchestrator.WriteCompletion):
+class RookWriteCompletion(orchestrator.Completion):
"""
Writes are a two-phase thing, firstly sending
the write to the k8s API (fast) and then waiting
for the corresponding change to appear in the
Ceph cluster (slow)
"""
- # XXX kubernetes bindings call_api already usefully has
- # a completion= param that uses threads. Maybe just
- # use that?
- def __init__(self, execute_cb, complete_cb, message):
- super(RookWriteCompletion, self).__init__(message)
- self.execute_cb = execute_cb
- self.complete_cb = complete_cb
-
- # Executed means I executed my k8s API call, it may or may
- # not have succeeded
- self.executed = False
-
- # Effective means, Rook finished applying the changes
- self.effective = False
-
- self.id = str(uuid.uuid4())
-
- @property
-
- @property
- def is_effective(self):
- return self.effective
-
- def execute(self):
- try:
- if not self.executed:
- self._result = self.execute_cb()
- self.executed = True
-
- if not self.effective:
- # TODO: check self.result for API errors
- if self.complete_cb is None:
- self.effective = True
- else:
- self.effective = self.complete_cb()
- except Exception as e:
- self.exception = e
+ def __init__(self, message):
+ self.progress_reference = orchestrator.ProgressReference(
+ message=message
+ )
+ super(RookWriteCompletion, self).__init__()
def deferred_read(f):
+ # type: (Callable) -> Callable[..., orchestrator.Completion]
"""
Decorator to make RookOrchestrator methods return
a completion object that executes themselves.
"""
@functools.wraps(f)
- def wrapper(*args, **kwargs):
- return RookReadCompletion(lambda: f(*args, **kwargs))
+ def wrapper(self, *args, **kwargs):
+ c = orchestrator.Completion()
+ c.then(
+ lambda: f(*args, **kwargs)
+ )
+ return c
return wrapper
]
def process(self, completions):
- if completions:
- self.log.info("wait: completions={0}".format(completions))
+ pass
+
+ def process_promises(self, promises):
+ # type: (List[RookPromise]) -> None
+
+
+ if promises:
+ self.log.info("wait: promises={0}".format(promises))
# Synchronously call the K8s API
- for c in completions:
- if not isinstance(c, RookReadCompletion) and \
- not isinstance(c, RookWriteCompletion):
+ for p in promises:
+ if not isinstance(p, RookPromise):
raise TypeError(
"wait() requires list of completions, not {0}".format(
- c.__class__
+ p.__class__
))
- if c.is_complete:
+ if not p.needs_result:
continue
- c.execute()
+ new_promise = p.execute()
if c.exception and not isinstance(c.exception, orchestrator.OrchestratorError):
self.log.exception("Completion {0} threw an exception:".format(
c.message
))
- c.exception = e
c._complete = True
self._shutdown = threading.Event()
+ self.all_promises = list() # type: List[RookPromise]
+
def shutdown(self):
self._shutdown.set()
# in case we had a caller that wait()'ed on them long enough
# to get persistence but not long enough to get completion
- self.wait(RookCompletionMixin.all_completions)
- RookCompletionMixin.all_completions = [c for c in RookCompletionMixin.all_completions if
- not c.is_finished]
+ self.process(self.all_promises)
+ self.all_promises = [p for p in self.all_promises if not p.completion.is_finished]
self._shutdown.wait(5)
lambda: self.rook_cluster.update_nfs_count(spec.name, num), None,
"Updating NFS server count in {0} to {1}".format(spec.name, num))
- def create_osds(self, drive_group, all_hosts):
+ def create_osds(self, drive_group, _):
# type: (DriveGroupSpec, List[str]) -> RookWriteCompletion
- assert len(drive_group.hosts(all_hosts)) == 1
targets = []
if drive_group.data_devices:
targets += drive_group.data_devices.paths
if drive_group.data_directories:
targets += drive_group.data_directories
- if not self.rook_cluster.node_exists(drive_group.hosts(all_hosts)[0]):
- raise RuntimeError("Node '{0}' is not in the Kubernetes "
- "cluster".format(drive_group.hosts(all_hosts)))
+ p = orchestrator.ProgressReference(
+ "Creating OSD on {0}:{1}".format(drive_group.hosts(drive_group.host_pattern),
+ targets))
- # Validate whether cluster CRD can accept individual OSD
- # creations (i.e. not useAllDevices)
- if not self.rook_cluster.can_create_osd():
- raise RuntimeError("Rook cluster configuration does not "
- "support OSD creation.")
+ def execute(all_hosts):
+ p.effective_when(
+ lambda hosts: has_osds
+ )
- def execute():
+ assert len(drive_group.hosts(all_hosts)) == 1
+
+ if not self.rook_cluster.node_exists(drive_group.hosts(all_hosts)[0]):
+ raise RuntimeError("Node '{0}' is not in the Kubernetes "
+ "cluster".format(drive_group.hosts(all_hosts)))
+
+ # Validate whether cluster CRD can accept individual OSD
+ # creations (i.e. not useAllDevices)
+ if not self.rook_cluster.can_create_osd():
+ raise RuntimeError("Rook cluster configuration does not "
+ "support OSD creation.")
return self.rook_cluster.add_osds(drive_group, all_hosts)
- def is_complete():
+ @deferred_read
+ def has_osds(all_hosts):
# Find OSD pods on this host
pod_osd_ids = set()
pods = self._k8s.list_namespaced_pod(self._rook_env.namespace,
return found is not None
- return RookWriteCompletion(execute, is_complete,
- "Creating OSD on {0}:{1}".format(
- drive_group.hosts(all_hosts)[0], targets
- ))
+
+ c = self.get_hosts().then(execute)
+ c.progress_reference = p
+ return c
+