From bf552615f22b8e9446f4d0dd96c9a97c3ae626a0 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Fri, 30 Aug 2019 15:10:10 +0200 Subject: [PATCH] mgr/rook: Adapt to new completions Signed-off-by: Sebastian Wagner --- src/pybind/mgr/rook/module.py | 178 ++++++++++++++++++---------------- 1 file changed, 92 insertions(+), 86 deletions(-) diff --git a/src/pybind/mgr/rook/module.py b/src/pybind/mgr/rook/module.py index 41c4c442d01..473e21fb534 100644 --- a/src/pybind/mgr/rook/module.py +++ b/src/pybind/mgr/rook/module.py @@ -1,16 +1,18 @@ import threading import functools import os -import uuid from ceph.deployment import inventory try: - from typing import List, Dict, Optional, Callable + from typing import List, Dict, Optional, Callable, TypeVar from ceph.deployment.drive_group import DriveGroupSpec except ImportError: pass # just for type checking +T = TypeVar('T') + + try: from kubernetes import client, config from kubernetes.client.rest import ApiException @@ -34,38 +36,39 @@ import orchestrator from .rook_cluster import RookCluster -class RookWriteCompletion(orchestrator.Completion): - """ - Writes are a two-phase thing, firstly sending - the write to the k8s API (fast) and then waiting - for the corresponding change to appear in the - Ceph cluster (slow) - """ - def __init__(self, message): - self.progress_reference = orchestrator.ProgressReference( - message=message - ) - super(RookWriteCompletion, self).__init__() +class RookCompletion(orchestrator.Completion[T]): + def evaluate(self): + self._first_promise.finalize(None) def deferred_read(f): - # type: (Callable) -> Callable[..., orchestrator.Completion] + # type: (Callable[..., T]) -> Callable[..., RookCompletion[T]] """ Decorator to make RookOrchestrator methods return a completion object that executes themselves. """ @functools.wraps(f) - def wrapper(self, *args, **kwargs): - c = orchestrator.Completion() - c.then( - lambda: f(*args, **kwargs) - ) - return c + def wrapper(*args, **kwargs): + return RookCompletion(on_complete=lambda _: f(*args, **kwargs)) return wrapper +def write_completion(on_complete, # type: Callable[[], T] + message, # type: str + mgr, + calc_percent=None # type: Optional[Callable[[], RookCompletion[float]]] + ): + # type: (...) -> RookCompletion[T] + return RookCompletion.with_progress( + message=message, + mgr=mgr, + on_complete=lambda _: on_complete(), + calc_percent=calc_percent, + ) + + class RookEnv(object): def __init__(self): # POD_NAMESPACE already exist for Rook 0.9 @@ -86,39 +89,27 @@ class RookEnv(object): class RookOrchestrator(MgrModule, orchestrator.Orchestrator): + """ + Writes are a two-phase thing, firstly sending + the write to the k8s API (fast) and then waiting + for the corresponding change to appear in the + Ceph cluster (slow) + + Right now, wre calling the k8s API synchronously. + """ + MODULE_OPTIONS = [ # TODO: configure k8s API addr instead of assuming local ] def process(self, completions): - pass - - def process_promises(self, promises): - # type: (List[RookPromise]) -> None - - - if promises: - self.log.info("wait: promises={0}".format(promises)) - - # Synchronously call the K8s API - for p in promises: - if not isinstance(p, RookPromise): - raise TypeError( - "wait() requires list of completions, not {0}".format( - p.__class__ - )) - - if not p.needs_result: - continue - - new_promise = p.execute() - if c.exception and not isinstance(c.exception, orchestrator.OrchestratorError): - self.log.exception("Completion {0} threw an exception:".format( - c.message - )) - c._complete = True + # type: (List[RookCompletion]) -> None + if completions: + self.log.info("wait: promises={0}".format(completions)) + for p in completions: + p.evaluate() @staticmethod def can_run(): @@ -151,7 +142,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): self._shutdown = threading.Event() - self.all_promises = list() # type: List[RookPromise] + self.all_progress_references = list() # type: List[orchestrator.ProgressReference] def shutdown(self): self._shutdown.set() @@ -208,8 +199,9 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): # in case we had a caller that wait()'ed on them long enough # to get persistence but not long enough to get completion - self.process(self.all_promises) - self.all_promises = [p for p in self.all_promises if not p.completion.is_finished] + self.all_progress_references = [p for p in self.all_progress_references if not p.effective] + for p in self.all_progress_references: + p.update() self._shutdown.wait(5) @@ -247,6 +239,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): @deferred_read def get_hosts(self): + # type: () -> List[orchestrator.InventoryNode] return [orchestrator.InventoryNode(n, inventory.Devices([])) for n in self.rook_cluster.get_node_names()] @deferred_read @@ -299,43 +292,55 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): return result def _service_add_decorate(self, typename, spec, func): - return RookWriteCompletion(lambda: func(spec), None, - "Creating {0} services for {1}".format(typename, spec.name)) + return write_completion( + on_complete=lambda : func(spec), + message="Creating {} services for {}".format(typename, spec.name), + mgr=self + ) def add_mds(self, spec): - return self._service_add_decorate("Filesystem", spec, - self.rook_cluster.add_filesystem) + return self._service_add_decorate('MDS', spec, + self.rook_cluster.add_filesystem) def add_rgw(self, spec): - return self._service_add_decorate("RGW", spec, - self.rook_cluster.add_objectstore) + return self._service_add_decorate('RGW', spec, + self.rook_cluster.add_objectstore) def add_nfs(self, spec): return self._service_add_decorate("NFS", spec, self.rook_cluster.add_nfsgw) + def _service_rm_decorate(self, typename, name, func): + return write_completion( + on_complete=lambda : func(name), + message="Removing {} services for {}".format(typename, name), + mgr=self + ) + def remove_mds(self, name): - return RookWriteCompletion( - lambda: self.rook_cluster.rm_service('cephfilesystems', name), None, - "Removing {0} services for {1}".format('mds', name)) + return self._service_rm_decorate( + 'MDS', name, lambda: self.rook_cluster.rm_service('cephfilesystems', name) + ) def remove_rgw(self, zone): - return RookWriteCompletion( - lambda: self.rook_cluster.rm_service('cephobjectstores', zone), None, - "Removing {0} services for {1}".format('rgw', zone)) + return self._service_rm_decorate( + 'RGW', zone, lambda: self.rook_cluster.rm_service('cephobjectstores', zone) + ) def remove_nfs(self, name): - return RookWriteCompletion( - lambda: self.rook_cluster.rm_service('cephnfses', name), None, - "Removing {0} services for {1}".format('nfs', name)) + return self._service_rm_decorate( + 'NFS', name, lambda: self.rook_cluster.rm_service('cephnfses', name) + ) def update_mons(self, num, hosts): if hosts: raise RuntimeError("Host list is not supported by rook.") - return RookWriteCompletion( - lambda: self.rook_cluster.update_mon_count(num), None, - "Updating mon count to {0}".format(num)) + return write_completion( + lambda: self.rook_cluster.update_mon_count(num), + "Updating mon count to {0}".format(num), + mgr=self + ) def update_mds(self, spec): num = spec.count @@ -345,27 +350,23 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): def update_nfs(self, spec): num = spec.count - return RookWriteCompletion( - lambda: self.rook_cluster.update_nfs_count(spec.name, num), None, - "Updating NFS server count in {0} to {1}".format(spec.name, num)) + return write_completion( + lambda: self.rook_cluster.update_nfs_count(spec.name, num), + "Updating NFS server count in {0} to {1}".format(spec.name, num), + mgr=self + ) - def create_osds(self, drive_group, _): - # type: (DriveGroupSpec, List[str]) -> RookWriteCompletion + def create_osds(self, drive_group): + # type: (DriveGroupSpec) -> RookCompletion - targets = [] + targets = [] # type: List[str] if drive_group.data_devices: targets += drive_group.data_devices.paths if drive_group.data_directories: targets += drive_group.data_directories - p = orchestrator.ProgressReference( - "Creating OSD on {0}:{1}".format(drive_group.hosts(drive_group.host_pattern), - targets)) - - def execute(all_hosts): - p.effective_when( - lambda hosts: has_osds - ) + def execute(all_hosts_): + all_hosts = orchestrator.InventoryNode.get_host_names(all_hosts_) assert len(drive_group.hosts(all_hosts)) == 1 @@ -378,7 +379,15 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): if not self.rook_cluster.can_create_osd(): raise RuntimeError("Rook cluster configuration does not " "support OSD creation.") - return self.rook_cluster.add_osds(drive_group, all_hosts) + + return orchestrator.Completion.with_progress( + message="Creating OSD on {0}:{1}".format( + drive_group.hosts(drive_group.host_pattern), + targets), + mgr=self, + on_complete=lambda _:self.rook_cluster.add_osds(drive_group, all_hosts), + calc_percent=lambda: has_osds(all_hosts) + ) @deferred_read def has_osds(all_hosts): @@ -411,8 +420,5 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): return found is not None - c = self.get_hosts().then(execute) - c.progress_reference = p return c - -- 2.39.5