from mgr_module import MgrModule, Option
import orchestrator
+from orchestrator import handle_orch_error, OrchResult, raise_if_exception
from .rook_cluster import RookCluster
ServiceSpecT = TypeVar('ServiceSpecT', bound=ServiceSpec)
-
-class RookCompletion(orchestrator.Completion[T]):
- def evaluate(self) -> None:
- self.finalize(None)
-
-
-def deferred_read(f):
- # type: (Callable[..., T]) -> Callable[..., RookCompletion[T]]
-
- # See https://stackoverflow.com/questions/65936408/typing-function-when-decorator-change-generic-return-type
- """
- Decorator to make RookOrchestrator methods return
- a completion object that executes themselves.
- """
-
- @functools.wraps(f)
- def wrapper(*args: Any, **kwargs: Any) -> RookCompletion[T]:
- return RookCompletion(on_complete=lambda _: f(*args, **kwargs))
-
- return wrapper
-
-
-def write_completion(on_complete, # type: Callable[[], T]
- message, # type: str
- mgr: 'RookOrchestrator',
- calc_percent=None # type: Optional[Callable[[], RookCompletion]]
- ):
- # type: (...) -> RookCompletion[T]
- return RookCompletion.with_progress(
- message=message,
- mgr=mgr,
- on_complete=lambda _: on_complete(),
- calc_percent=calc_percent,
- )
-
-
class RookEnv(object):
def __init__(self) -> None:
# POD_NAMESPACE already exist for Rook 0.9
# TODO: configure k8s API addr instead of assuming local
]
- def process(self, completions: List[RookCompletion]) -> None: # type: ignore
- if completions:
- self.log.info("process: completions={0}".format(orchestrator.pretty_print(completions)))
-
- for p in completions:
- p.evaluate()
-
@staticmethod
def can_run() -> Tuple[bool, str]:
if not kubernetes_imported:
self._shutdown = threading.Event()
- self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
-
def shutdown(self) -> None:
self._shutdown.set()
self._initialized.set()
while not self._shutdown.is_set():
- # XXX hack (or is it?) to kick all completions periodically,
- # in case we had a caller that wait()'ed on them long enough
- # to get persistence but not long enough to get completion
-
- self.all_progress_references = [p for p in self.all_progress_references if not p.effective]
- for p in self.all_progress_references:
- p.update()
-
self._shutdown.wait(5)
- def cancel_completions(self) -> None:
- for p in self.all_progress_references:
- p.fail()
- self.all_progress_references.clear()
-
- @deferred_read
+ @handle_orch_error
def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
host_list = None
if host_filter and host_filter.hosts:
sys_api = dict(
rotational = '1' if d['rotational'] else '0',
size = d['size']
- ),
+ ),
available = False,
rejected_reasons=['device data coming from ceph-volume not provided'],
))
return result
- @deferred_read
+ @handle_orch_error
def get_hosts(self):
# type: () -> List[orchestrator.HostSpec]
return [orchestrator.HostSpec(n) for n in self.rook_cluster.get_node_names()]
- @deferred_read
+ @handle_orch_error
def describe_service(self,
service_type: Optional[str] = None,
service_name: Optional[str] = None,
spec = {}
if service_type == 'mon' or service_type is None:
spec['mon'] = orchestrator.ServiceDescription(
- spec=ServiceSpec(
- 'mon',
- placement=PlacementSpec(
- count=cl['spec'].get('mon', {}).get('count', 1),
- ),
- ),
- size=cl['spec'].get('mon', {}).get('count', 1),
- container_image_name=image_name,
- last_refresh=now,
- )
+ spec=ServiceSpec(
+ 'mon',
+ placement=PlacementSpec(
+ count=cl['spec'].get('mon', {}).get('count', 1),
+ ),
+ ),
+ size=cl['spec'].get('mon', {}).get('count', 1),
+ container_image_name=image_name,
+ last_refresh=now,
+ )
if service_type == 'mgr' or service_type is None:
spec['mgr'] = orchestrator.ServiceDescription(
- spec=ServiceSpec(
- 'mgr',
- placement=PlacementSpec.from_string('count:1'),
- ),
- size=1,
- container_image_name=image_name,
- last_refresh=now,
- )
+ spec=ServiceSpec(
+ 'mgr',
+ placement=PlacementSpec.from_string('count:1'),
+ ),
+ size=1,
+ container_image_name=image_name,
+ last_refresh=now,
+ )
if not cl['spec'].get('crashCollector', {}).get('disable', False):
spec['crash'] = orchestrator.ServiceDescription(
spec=ServiceSpec(
if service_type == 'mds' or service_type is None:
# CephFilesystems
all_fs = self.rook_cluster.rook_api_get(
- "cephfilesystems/")
+ "cephfilesystems/")
self.log.debug('CephFilesystems %s' % all_fs)
for fs in all_fs.get('items', []):
svc = 'mds.' + fs['metadata']['name']
if fs['spec'].get('metadataServer', {}).get('activeStandby', False):
total_mds = active * 2
spec[svc] = orchestrator.ServiceDescription(
- spec=ServiceSpec(
- service_type='mds',
- service_id=fs['metadata']['name'],
- placement=PlacementSpec(count=active),
- ),
- size=total_mds,
- container_image_name=image_name,
- last_refresh=now,
- )
+ spec=ServiceSpec(
+ service_type='mds',
+ service_id=fs['metadata']['name'],
+ placement=PlacementSpec(count=active),
+ ),
+ size=total_mds,
+ container_image_name=image_name,
+ last_refresh=now,
+ )
if service_type == 'rgw' or service_type is None:
# CephObjectstores
all_zones = self.rook_cluster.rook_api_get(
- "cephobjectstores/")
+ "cephobjectstores/")
self.log.debug('CephObjectstores %s' % all_zones)
for zone in all_zones.get('items', []):
rgw_realm = zone['metadata']['name']
ssl = False
port = zone['spec']['gateway']['port'] or 80
spec[svc] = orchestrator.ServiceDescription(
- spec=RGWSpec(
- service_id=rgw_realm + '.' + rgw_zone,
- rgw_realm=rgw_realm,
- rgw_zone=rgw_zone,
- ssl=ssl,
- rgw_frontend_port=port,
- placement=PlacementSpec(count=active),
- ),
- size=active,
- container_image_name=image_name,
- last_refresh=now,
- )
+ spec=RGWSpec(
+ service_id=rgw_realm + '.' + rgw_zone,
+ rgw_realm=rgw_realm,
+ rgw_zone=rgw_zone,
+ ssl=ssl,
+ rgw_frontend_port=port,
+ placement=PlacementSpec(count=active),
+ ),
+ size=active,
+ container_image_name=image_name,
+ last_refresh=now,
+ )
if service_type == 'nfs' or service_type is None:
# CephNFSes
all_nfs = self.rook_cluster.rook_api_get(
- "cephnfses/")
+ "cephnfses/")
self.log.warning('CephNFS %s' % all_nfs)
for nfs in all_nfs.get('items', []):
nfs_name = nfs['metadata']['name']
continue
active = nfs['spec'].get('server', {}).get('active')
spec[svc] = orchestrator.ServiceDescription(
- spec=NFSServiceSpec(
- service_id=nfs_name,
- pool=nfs['spec']['rados']['pool'],
- namespace=nfs['spec']['rados'].get('namespace', None),
- placement=PlacementSpec(count=active),
- ),
- size=active,
- last_refresh=now,
- )
+ spec=NFSServiceSpec(
+ service_id=nfs_name,
+ pool=nfs['spec']['rados']['pool'],
+ namespace=nfs['spec']['rados'].get('namespace', None),
+ placement=PlacementSpec(count=active),
+ ),
+ size=active,
+ last_refresh=now,
+ )
for dd in self._list_daemons():
if dd.service_name() not in spec:
return [v for k, v in spec.items()]
- @deferred_read
+ @handle_orch_error
def list_daemons(self,
service_name: Optional[str] = None,
daemon_type: Optional[str] = None,
return result
- def _service_add_decorate(self, typename: str, spec: ServiceSpecT, func: Callable[[ServiceSpecT], T]) -> RookCompletion[T]:
- return write_completion(
- on_complete=lambda : func(spec),
- message="Creating {} services for {}".format(typename, spec.service_id),
- mgr=self
- )
-
- def _service_rm_decorate(self, typename: str, name: str, func: Callable[[], T]) -> RookCompletion[T]:
- return write_completion(
- on_complete=lambda : func(),
- message="Removing {} services for {}".format(typename, name),
- mgr=self
- )
-
- def remove_service(self, service_name: str) -> RookCompletion[str]:
+ @handle_orch_error
+ def remove_service(self, service_name: str) -> str:
service_type, service_name = service_name.split('.', 1)
if service_type == 'mds':
- return self._service_rm_decorate(
- 'MDS', service_name, lambda: self.rook_cluster.rm_service(
- 'cephfilesystems', service_name)
- )
+ return self.rook_cluster.rm_service('cephfilesystems', service_name)
elif service_type == 'rgw':
- return self._service_rm_decorate(
- 'RGW', service_name, lambda: self.rook_cluster.rm_service('cephobjectstores', service_name)
- )
+ return self.rook_cluster.rm_service('cephobjectstores', service_name)
elif service_type == 'nfs':
- return self._service_rm_decorate(
- 'NFS', service_name, lambda: self.rook_cluster.rm_service('cephnfses', service_name)
- )
+ return self.rook_cluster.rm_service('cephnfses', service_name)
else:
raise orchestrator.OrchestratorError(f'Service type {service_type} not supported')
+ @handle_orch_error
def apply_mon(self, spec):
- # type: (ServiceSpec) -> RookCompletion[str]
+ # type: (ServiceSpec) -> str
if spec.placement.hosts or spec.placement.label:
raise RuntimeError("Host list or label is not supported by rook.")
- return write_completion(
- lambda: self.rook_cluster.update_mon_count(spec.placement.count),
- "Updating mon count to {0}".format(spec.placement.count),
- mgr=self
- )
+ return self.rook_cluster.update_mon_count(spec.placement.count)
+ @handle_orch_error
def apply_mds(self, spec):
- # type: (ServiceSpec) -> RookCompletion[str]
- return self._service_add_decorate('MDS', spec,
- self.rook_cluster.apply_filesystem)
+ # type: (ServiceSpec) -> str
+ return self.rook_cluster.apply_filesystem(spec)
+ @handle_orch_error
def apply_rgw(self, spec):
- # type: (RGWSpec) -> RookCompletion[str]
- return self._service_add_decorate('RGW', spec,
- self.rook_cluster.apply_objectstore)
+ # type: (RGWSpec) -> str
+ return self.rook_cluster.apply_objectstore(spec)
+ @handle_orch_error
def apply_nfs(self, spec):
- # type: (NFSServiceSpec) -> RookCompletion[str]
- return self._service_add_decorate("NFS", spec,
- self.rook_cluster.apply_nfsgw)
+ # type: (NFSServiceSpec) -> str
+ return self.rook_cluster.apply_nfsgw(spec)
- def remove_daemons(self, names: List[str]) -> RookCompletion[List[str]]:
- return write_completion(
- lambda: self.rook_cluster.remove_pods(names),
- "Removing daemons {}".format(','.join(names)),
- mgr=self
- )
+ @handle_orch_error
+ def remove_daemons(self, names: List[str]) -> List[str]:
+ return self.rook_cluster.remove_pods(names)
+ @handle_orch_error
def create_osds(self, drive_group):
- # type: (DriveGroupSpec) -> RookCompletion[str]
+ # type: (DriveGroupSpec) -> str
""" Creates OSDs from a drive group specification.
$: ceph orch osd create -i <dg.file>
if drive_group.data_directories:
targets += drive_group.data_directories
- def execute(all_hosts_):
- # type: (List[orchestrator.HostSpec]) -> orchestrator.Completion
- matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts_)
-
- assert len(matching_hosts) == 1
-
- if not self.rook_cluster.node_exists(matching_hosts[0]):
- raise RuntimeError("Node '{0}' is not in the Kubernetes "
- "cluster".format(matching_hosts))
-
- # Validate whether cluster CRD can accept individual OSD
- # creations (i.e. not useAllDevices)
- if not self.rook_cluster.can_create_osd():
- raise RuntimeError("Rook cluster configuration does not "
- "support OSD creation.")
-
- return orchestrator.Completion.with_progress(
- message="Creating OSD on {0}:{1}".format(
- matching_hosts,
- targets),
- mgr=self,
- on_complete=lambda _:self.rook_cluster.add_osds(drive_group, matching_hosts),
- calc_percent=lambda: has_osds(matching_hosts)
- )
+ all_hosts = raise_if_exception(self.get_hosts())
- @deferred_read
+ matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts)
+
+ assert len(matching_hosts) == 1
+
+ if not self.rook_cluster.node_exists(matching_hosts[0]):
+ raise RuntimeError("Node '{0}' is not in the Kubernetes "
+ "cluster".format(matching_hosts))
+
+ # Validate whether cluster CRD can accept individual OSD
+ # creations (i.e. not useAllDevices)
+ if not self.rook_cluster.can_create_osd():
+ raise RuntimeError("Rook cluster configuration does not "
+ "support OSD creation.")
+
+ return self.rook_cluster.add_osds(drive_group, matching_hosts)
+
+ # TODO: this was the code to update the progress reference:
+ """
+ @handle_orch_error
def has_osds(matching_hosts: List[str]) -> bool:
# Find OSD pods on this host
pod_osd_ids = set()
pods = self.k8s.list_namespaced_pod(self._rook_env.namespace,
- label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
- field_selector="spec.nodeName={0}".format(
- matching_hosts[0]
- )).items
+ label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
+ field_selector="spec.nodeName={0}".format(
+ matching_hosts[0]
+ )).items
for p in pods:
pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
))
return found is not None
+ """
- c = self.get_hosts().then(execute)
- return c
-
- def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> RookCompletion:
- return write_completion(
- on_complete=lambda: self.rook_cluster.blink_light(
- ident_fault, on, locs),
- message="Switching <{}> identification light in {}".format(
- on, ",".join(["{}:{}".format(loc.host, loc.dev) for loc in locs])),
- mgr=self
- )
+ @handle_orch_error
+ def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]:
+ return self.rook_cluster.blink_light(ident_fault, on, locs)