import threading
import functools
import os
-import uuid
from ceph.deployment import inventory
try:
- from typing import List, Dict, Optional, Callable
+ from typing import List, Dict, Optional, Callable, TypeVar
from ceph.deployment.drive_group import DriveGroupSpec
except ImportError:
pass # just for type checking
+T = TypeVar('T')
+
+
try:
from kubernetes import client, config
from kubernetes.client.rest import ApiException
from .rook_cluster import RookCluster
-class RookWriteCompletion(orchestrator.Completion):
- """
- Writes are a two-phase thing, firstly sending
- the write to the k8s API (fast) and then waiting
- for the corresponding change to appear in the
- Ceph cluster (slow)
- """
- def __init__(self, message):
- self.progress_reference = orchestrator.ProgressReference(
- message=message
- )
- super(RookWriteCompletion, self).__init__()
+class RookCompletion(orchestrator.Completion[T]):
+ def evaluate(self):
+ self._first_promise.finalize(None)
def deferred_read(f):
- # type: (Callable) -> Callable[..., orchestrator.Completion]
+ # type: (Callable[..., T]) -> Callable[..., RookCompletion[T]]
"""
Decorator to make RookOrchestrator methods return
a completion object that executes themselves.
"""
@functools.wraps(f)
- def wrapper(self, *args, **kwargs):
- c = orchestrator.Completion()
- c.then(
- lambda: f(*args, **kwargs)
- )
- return c
+ def wrapper(*args, **kwargs):
+ return RookCompletion(on_complete=lambda _: f(*args, **kwargs))
return wrapper
+def write_completion(on_complete, # type: Callable[[], T]
+ message, # type: str
+ mgr,
+ calc_percent=None # type: Optional[Callable[[], RookCompletion[float]]]
+ ):
+ # type: (...) -> RookCompletion[T]
+ return RookCompletion.with_progress(
+ message=message,
+ mgr=mgr,
+ on_complete=lambda _: on_complete(),
+ calc_percent=calc_percent,
+ )
+
+
class RookEnv(object):
def __init__(self):
# POD_NAMESPACE already exist for Rook 0.9
class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
+ """
+ Writes are a two-phase thing, firstly sending
+ the write to the k8s API (fast) and then waiting
+ for the corresponding change to appear in the
+ Ceph cluster (slow)
+
+ Right now, wre calling the k8s API synchronously.
+ """
+
MODULE_OPTIONS = [
# TODO: configure k8s API addr instead of assuming local
]
def process(self, completions):
- pass
-
- def process_promises(self, promises):
- # type: (List[RookPromise]) -> None
-
-
- if promises:
- self.log.info("wait: promises={0}".format(promises))
-
- # Synchronously call the K8s API
- for p in promises:
- if not isinstance(p, RookPromise):
- raise TypeError(
- "wait() requires list of completions, not {0}".format(
- p.__class__
- ))
-
- if not p.needs_result:
- continue
-
- new_promise = p.execute()
- if c.exception and not isinstance(c.exception, orchestrator.OrchestratorError):
- self.log.exception("Completion {0} threw an exception:".format(
- c.message
- ))
- c._complete = True
+ # type: (List[RookCompletion]) -> None
+ if completions:
+ self.log.info("wait: promises={0}".format(completions))
+ for p in completions:
+ p.evaluate()
@staticmethod
def can_run():
self._shutdown = threading.Event()
- self.all_promises = list() # type: List[RookPromise]
+ self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
def shutdown(self):
self._shutdown.set()
# in case we had a caller that wait()'ed on them long enough
# to get persistence but not long enough to get completion
- self.process(self.all_promises)
- self.all_promises = [p for p in self.all_promises if not p.completion.is_finished]
+ self.all_progress_references = [p for p in self.all_progress_references if not p.effective]
+ for p in self.all_progress_references:
+ p.update()
self._shutdown.wait(5)
@deferred_read
def get_hosts(self):
+ # type: () -> List[orchestrator.InventoryNode]
return [orchestrator.InventoryNode(n, inventory.Devices([])) for n in self.rook_cluster.get_node_names()]
@deferred_read
return result
def _service_add_decorate(self, typename, spec, func):
- return RookWriteCompletion(lambda: func(spec), None,
- "Creating {0} services for {1}".format(typename, spec.name))
+ return write_completion(
+ on_complete=lambda : func(spec),
+ message="Creating {} services for {}".format(typename, spec.name),
+ mgr=self
+ )
def add_mds(self, spec):
- return self._service_add_decorate("Filesystem", spec,
- self.rook_cluster.add_filesystem)
+ return self._service_add_decorate('MDS', spec,
+ self.rook_cluster.add_filesystem)
def add_rgw(self, spec):
- return self._service_add_decorate("RGW", spec,
- self.rook_cluster.add_objectstore)
+ return self._service_add_decorate('RGW', spec,
+ self.rook_cluster.add_objectstore)
def add_nfs(self, spec):
return self._service_add_decorate("NFS", spec,
self.rook_cluster.add_nfsgw)
+ def _service_rm_decorate(self, typename, name, func):
+ return write_completion(
+ on_complete=lambda : func(name),
+ message="Removing {} services for {}".format(typename, name),
+ mgr=self
+ )
+
def remove_mds(self, name):
- return RookWriteCompletion(
- lambda: self.rook_cluster.rm_service('cephfilesystems', name), None,
- "Removing {0} services for {1}".format('mds', name))
+ return self._service_rm_decorate(
+ 'MDS', name, lambda: self.rook_cluster.rm_service('cephfilesystems', name)
+ )
def remove_rgw(self, zone):
- return RookWriteCompletion(
- lambda: self.rook_cluster.rm_service('cephobjectstores', zone), None,
- "Removing {0} services for {1}".format('rgw', zone))
+ return self._service_rm_decorate(
+ 'RGW', zone, lambda: self.rook_cluster.rm_service('cephobjectstores', zone)
+ )
def remove_nfs(self, name):
- return RookWriteCompletion(
- lambda: self.rook_cluster.rm_service('cephnfses', name), None,
- "Removing {0} services for {1}".format('nfs', name))
+ return self._service_rm_decorate(
+ 'NFS', name, lambda: self.rook_cluster.rm_service('cephnfses', name)
+ )
def update_mons(self, num, hosts):
if hosts:
raise RuntimeError("Host list is not supported by rook.")
- return RookWriteCompletion(
- lambda: self.rook_cluster.update_mon_count(num), None,
- "Updating mon count to {0}".format(num))
+ return write_completion(
+ lambda: self.rook_cluster.update_mon_count(num),
+ "Updating mon count to {0}".format(num),
+ mgr=self
+ )
def update_mds(self, spec):
num = spec.count
def update_nfs(self, spec):
num = spec.count
- return RookWriteCompletion(
- lambda: self.rook_cluster.update_nfs_count(spec.name, num), None,
- "Updating NFS server count in {0} to {1}".format(spec.name, num))
+ return write_completion(
+ lambda: self.rook_cluster.update_nfs_count(spec.name, num),
+ "Updating NFS server count in {0} to {1}".format(spec.name, num),
+ mgr=self
+ )
- def create_osds(self, drive_group, _):
- # type: (DriveGroupSpec, List[str]) -> RookWriteCompletion
+ def create_osds(self, drive_group):
+ # type: (DriveGroupSpec) -> RookCompletion
- targets = []
+ targets = [] # type: List[str]
if drive_group.data_devices:
targets += drive_group.data_devices.paths
if drive_group.data_directories:
targets += drive_group.data_directories
- p = orchestrator.ProgressReference(
- "Creating OSD on {0}:{1}".format(drive_group.hosts(drive_group.host_pattern),
- targets))
-
- def execute(all_hosts):
- p.effective_when(
- lambda hosts: has_osds
- )
+ def execute(all_hosts_):
+ all_hosts = orchestrator.InventoryNode.get_host_names(all_hosts_)
assert len(drive_group.hosts(all_hosts)) == 1
if not self.rook_cluster.can_create_osd():
raise RuntimeError("Rook cluster configuration does not "
"support OSD creation.")
- return self.rook_cluster.add_osds(drive_group, all_hosts)
+
+ return orchestrator.Completion.with_progress(
+ message="Creating OSD on {0}:{1}".format(
+ drive_group.hosts(drive_group.host_pattern),
+ targets),
+ mgr=self,
+ on_complete=lambda _:self.rook_cluster.add_osds(drive_group, all_hosts),
+ calc_percent=lambda: has_osds(all_hosts)
+ )
@deferred_read
def has_osds(all_hosts):
return found is not None
-
c = self.get_hosts().then(execute)
- c.progress_reference = p
return c
-