self.cleanup()
-class AsyncCompletion(orchestrator.Completion):
- def __init__(self,
- _first_promise=None, # type: Optional[orchestrator.Completion]
- value=orchestrator._Promise.NO_RESULT, # type: Any
- on_complete=None, # type: Optional[Callable]
- name=None, # type: Optional[str]
- update_progress=False, # type: bool
- ):
-
- assert CephadmOrchestrator.instance is not None
- self.update_progress = update_progress
- if name is None and on_complete is not None:
- name = getattr(on_complete, '__name__', None)
- super(AsyncCompletion, self).__init__(_first_promise, value, on_complete, name)
-
- @property
- def _progress_reference(self):
- # type: () -> Optional[orchestrator.ProgressReference]
- if hasattr(self._on_complete_, 'progress_id'): # type: ignore
- return self._on_complete_ # type: ignore
- return None
-
- @property
- def _on_complete(self):
- # type: () -> Optional[Callable]
- if self._on_complete_ is None:
- return None
-
- def callback(result):
- try:
- if self.update_progress:
- assert self.progress_reference
- self.progress_reference.progress = 1.0
- self._on_complete_ = None
- self._finalize(result)
- except Exception as e:
- try:
- self.fail(e)
- except Exception:
- logger.exception(f'failed to fail AsyncCompletion: >{repr(self)}<')
- if 'UNITTEST' in os.environ:
- assert False
-
- def error_callback(e):
- pass
-
- def run(value):
- def do_work(*args, **kwargs):
- assert self._on_complete_ is not None
- try:
- res = self._on_complete_(*args, **kwargs)
- return res
- except Exception as e:
- self.fail(e)
- raise
-
- assert CephadmOrchestrator.instance
- CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
- callback=callback, error_callback=error_callback)
- return self.ASYNC_RESULT
-
- return run
-
- @_on_complete.setter
- def _on_complete(self, inner):
- # type: (Callable) -> None
- self._on_complete_ = inner
-
-
def forall_hosts(f: Callable[..., T]) -> Callable[..., List[T]]:
@wraps(f)
def forall_hosts_wrapper(*args) -> List[T]:
return forall_hosts_wrapper
-def trivial_completion(f):
- # type: (Callable) -> Callable[..., orchestrator.Completion]
+class CephadmCompletion(orchestrator.Completion):
+ def evaluate(self):
+ self.finalize(None)
+
+def trivial_completion(f: Callable) -> Callable[..., CephadmCompletion]:
+ """
+ Decorator to make CephadmCompletion methods return
+ a completion object that executes themselves.
+ """
+
@wraps(f)
def wrapper(*args, **kwargs):
- return AsyncCompletion(value=f(*args, **kwargs), name=f.__name__)
+ return CephadmCompletion(on_complete=lambda _: f(*args, **kwargs))
+
return wrapper
self.log.debug("process: completions={0}".format(orchestrator.pretty_print(completions)))
for p in completions:
- p.finalize()
+ p.evaluate()
@orchestrator._cli_write_command(
prefix='cephadm set-ssh-config',