From 4c5a79ccf2ba118301e9a6d64603391373dfd5ea Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Mon, 8 Feb 2021 01:47:42 +0100 Subject: [PATCH] mgr/rook: Adapt to new orch interface Signed-off-by: Sebastian Wagner (cherry picked from commit 42d5c8991a9b806b960623386c95128515468c6d) Conflicts: src/pybind/mgr/rook/module.py --- src/pybind/mgr/rook/module.py | 309 ++++++++++------------------ src/pybind/mgr/rook/rook_cluster.py | 4 +- 2 files changed, 113 insertions(+), 200 deletions(-) diff --git a/src/pybind/mgr/rook/module.py b/src/pybind/mgr/rook/module.py index 45c0097dbd3c8..cd9f578a18047 100644 --- a/src/pybind/mgr/rook/module.py +++ b/src/pybind/mgr/rook/module.py @@ -32,41 +32,11 @@ except ImportError: from mgr_module import MgrModule, Option import orchestrator +from orchestrator import handle_orch_error, OrchResult, raise_if_exception from .rook_cluster import RookCluster -class RookCompletion(orchestrator.Completion): - def evaluate(self): - self.finalize(None) - - -def deferred_read(f): - # type: (Callable) -> Callable[..., RookCompletion] - """ - Decorator to make RookOrchestrator methods return - a completion object that executes themselves. - """ - - @functools.wraps(f) - def wrapper(*args, **kwargs): - return RookCompletion(on_complete=lambda _: f(*args, **kwargs)) - - return wrapper - - -def write_completion(on_complete, # type: Callable - message, # type: str - mgr, - calc_percent=None # type: Optional[Callable[[], RookCompletion]] - ): - # type: (...) -> RookCompletion - return RookCompletion.with_progress( - message=message, - mgr=mgr, - on_complete=lambda _: on_complete(), - calc_percent=calc_percent, - ) class RookEnv(object): @@ -102,13 +72,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): # TODO: configure k8s API addr instead of assuming local ] - def process(self, completions: List[RookCompletion]) -> None: # type: ignore - if completions: - self.log.info("process: completions={0}".format(orchestrator.pretty_print(completions))) - - for p in completions: - p.evaluate() - @staticmethod def can_run(): if not kubernetes_imported: @@ -141,9 +104,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): self._shutdown = threading.Event() - self.all_progress_references = list() # type: List[orchestrator.ProgressReference] - - def shutdown(self): + def shutdown(self) -> None: self._shutdown.set() @property @@ -196,22 +157,9 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): self._initialized.set() while not self._shutdown.is_set(): - # XXX hack (or is it?) to kick all completions periodically, - # in case we had a caller that wait()'ed on them long enough - # to get persistence but not long enough to get completion - - self.all_progress_references = [p for p in self.all_progress_references if not p.effective] - for p in self.all_progress_references: - p.update() - self._shutdown.wait(5) - def cancel_completions(self): - for p in self.all_progress_references: - p.fail() - self.all_progress_references.clear() - - @deferred_read + @handle_orch_error def get_inventory(self, host_filter=None, refresh=False): host_list = None if host_filter and host_filter.hosts: @@ -236,7 +184,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): sys_api = dict( rotational = '1' if d['rotational'] else '0', size = d['size'] - ), + ), available = False, rejected_reasons=['device data coming from ceph-volume not provided'], )) @@ -245,12 +193,12 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): return result - @deferred_read + @handle_orch_error def get_hosts(self): # type: () -> List[orchestrator.HostSpec] return [orchestrator.HostSpec(n) for n in self.rook_cluster.get_node_names()] - @deferred_read + @handle_orch_error def describe_service(self, service_type=None, service_name=None, refresh=False): now = datetime.datetime.utcnow() @@ -265,26 +213,26 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): spec = {} if service_type == 'mon' or service_type is None: spec['mon'] = orchestrator.ServiceDescription( - spec=ServiceSpec( - 'mon', - placement=PlacementSpec( - count=cl['spec'].get('mon', {}).get('count', 1), - ), - ), - size=cl['spec'].get('mon', {}).get('count', 1), - container_image_name=image_name, - last_refresh=now, - ) + spec=ServiceSpec( + 'mon', + placement=PlacementSpec( + count=cl['spec'].get('mon', {}).get('count', 1), + ), + ), + size=cl['spec'].get('mon', {}).get('count', 1), + container_image_name=image_name, + last_refresh=now, + ) if service_type == 'mgr' or service_type is None: spec['mgr'] = orchestrator.ServiceDescription( - spec=ServiceSpec( - 'mgr', - placement=PlacementSpec.from_string('count:1'), - ), - size=1, - container_image_name=image_name, - last_refresh=now, - ) + spec=ServiceSpec( + 'mgr', + placement=PlacementSpec.from_string('count:1'), + ), + size=1, + container_image_name=image_name, + last_refresh=now, + ) if not cl['spec'].get('crashCollector', {}).get('disable', False): spec['crash'] = orchestrator.ServiceDescription( spec=ServiceSpec( @@ -299,7 +247,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): if service_type == 'mds' or service_type is None: # CephFilesystems all_fs = self.rook_cluster.rook_api_get( - "cephfilesystems/") + "cephfilesystems/") self.log.debug('CephFilesystems %s' % all_fs) for fs in all_fs.get('items', []): svc = 'mds.' + fs['metadata']['name'] @@ -311,20 +259,20 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): if fs['spec'].get('metadataServer', {}).get('activeStandby', False): total_mds = active * 2 spec[svc] = orchestrator.ServiceDescription( - spec=ServiceSpec( - service_type='mds', - service_id=fs['metadata']['name'], - placement=PlacementSpec(count=active), - ), - size=total_mds, - container_image_name=image_name, - last_refresh=now, - ) + spec=ServiceSpec( + service_type='mds', + service_id=fs['metadata']['name'], + placement=PlacementSpec(count=active), + ), + size=total_mds, + container_image_name=image_name, + last_refresh=now, + ) if service_type == 'rgw' or service_type is None: # CephObjectstores all_zones = self.rook_cluster.rook_api_get( - "cephobjectstores/") + "cephobjectstores/") self.log.debug('CephObjectstores %s' % all_zones) for zone in all_zones.get('items', []): rgw_realm = zone['metadata']['name'] @@ -340,23 +288,23 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): ssl = False port = zone['spec']['gateway']['port'] or 80 spec[svc] = orchestrator.ServiceDescription( - spec=RGWSpec( - service_id=rgw_realm + '.' + rgw_zone, - rgw_realm=rgw_realm, - rgw_zone=rgw_zone, - ssl=ssl, - rgw_frontend_port=port, - placement=PlacementSpec(count=active), - ), - size=active, - container_image_name=image_name, - last_refresh=now, - ) + spec=RGWSpec( + service_id=rgw_realm + '.' + rgw_zone, + rgw_realm=rgw_realm, + rgw_zone=rgw_zone, + ssl=ssl, + rgw_frontend_port=port, + placement=PlacementSpec(count=active), + ), + size=active, + container_image_name=image_name, + last_refresh=now, + ) if service_type == 'nfs' or service_type is None: # CephNFSes all_nfs = self.rook_cluster.rook_api_get( - "cephnfses/") + "cephnfses/") self.log.warning('CephNFS %s' % all_nfs) for nfs in all_nfs.get('items', []): nfs_name = nfs['metadata']['name'] @@ -365,15 +313,15 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): continue active = nfs['spec'].get('server', {}).get('active') spec[svc] = orchestrator.ServiceDescription( - spec=NFSServiceSpec( - service_id=nfs_name, - pool=nfs['spec']['rados']['pool'], - namespace=nfs['spec']['rados'].get('namespace', None), - placement=PlacementSpec(count=active), - ), - size=active, - last_refresh=now, - ) + spec=NFSServiceSpec( + service_id=nfs_name, + pool=nfs['spec']['rados']['pool'], + namespace=nfs['spec']['rados'].get('namespace', None), + placement=PlacementSpec(count=active), + ), + size=active, + last_refresh=now, + ) for dd in self._list_daemons(): if dd.service_name() not in spec: @@ -391,7 +339,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): return [v for k, v in spec.items()] - @deferred_read + @handle_orch_error def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None, refresh=False): return self._list_daemons(service_name=service_name, @@ -440,71 +388,48 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): return result - def _service_add_decorate(self, typename, spec, func): - return write_completion( - on_complete=lambda : func(spec), - message="Creating {} services for {}".format(typename, spec.service_id), - mgr=self - ) - - def _service_rm_decorate(self, typename, name, func): - return write_completion( - on_complete=lambda : func(), - message="Removing {} services for {}".format(typename, name), - mgr=self - ) - - def remove_service(self, service_name): + @handle_orch_error + def remove_service(self, service_name: str) -> str: service_type, service_name = service_name.split('.', 1) if service_type == 'mds': - return self._service_rm_decorate( - 'MDS', service_name, lambda: self.rook_cluster.rm_service( - 'cephfilesystems', service_name) - ) + return self.rook_cluster.rm_service('cephfilesystems', service_name) elif service_type == 'rgw': - return self._service_rm_decorate( - 'RGW', service_name, lambda: self.rook_cluster.rm_service('cephobjectstores', service_name) - ) + return self.rook_cluster.rm_service('cephobjectstores', service_name) elif service_type == 'nfs': - return self._service_rm_decorate( - 'NFS', service_name, lambda: self.rook_cluster.rm_service('cephnfses', service_name) - ) + return self.rook_cluster.rm_service('cephnfses', service_name) + else: + raise orchestrator.OrchestratorError(f'Service type {service_type} not supported') + @handle_orch_error def apply_mon(self, spec): - # type: (ServiceSpec) -> RookCompletion + # type: (ServiceSpec) -> str if spec.placement.hosts or spec.placement.label: raise RuntimeError("Host list or label is not supported by rook.") - return write_completion( - lambda: self.rook_cluster.update_mon_count(spec.placement.count), - "Updating mon count to {0}".format(spec.placement.count), - mgr=self - ) + return self.rook_cluster.update_mon_count(spec.placement.count) + @handle_orch_error def apply_mds(self, spec): - # type: (ServiceSpec) -> RookCompletion - return self._service_add_decorate('MDS', spec, - self.rook_cluster.apply_filesystem) + # type: (ServiceSpec) -> str + return self.rook_cluster.apply_filesystem(spec) + @handle_orch_error def apply_rgw(self, spec): - # type: (RGWSpec) -> RookCompletion - return self._service_add_decorate('RGW', spec, - self.rook_cluster.apply_objectstore) + # type: (RGWSpec) -> str + return self.rook_cluster.apply_objectstore(spec) + @handle_orch_error def apply_nfs(self, spec): - # type: (NFSServiceSpec) -> RookCompletion - return self._service_add_decorate("NFS", spec, - self.rook_cluster.apply_nfsgw) + # type: (NFSServiceSpec) -> str + return self.rook_cluster.apply_nfsgw(spec) - def remove_daemons(self, names): - return write_completion( - lambda: self.rook_cluster.remove_pods(names), - "Removing daemons {}".format(','.join(names)), - mgr=self - ) + @handle_orch_error + def remove_daemons(self, names: List[str]) -> List[str]: + return self.rook_cluster.remove_pods(names) + @handle_orch_error def create_osds(self, drive_group): - # type: (DriveGroupSpec) -> RookCompletion + # type: (DriveGroupSpec) -> str """ Creates OSDs from a drive group specification. $: ceph orch osd create -i @@ -518,41 +443,36 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): if drive_group.data_directories: targets += drive_group.data_directories - def execute(all_hosts_): - # type: (List[orchestrator.HostSpec]) -> orchestrator.Completion - matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts_) - - assert len(matching_hosts) == 1 - - if not self.rook_cluster.node_exists(matching_hosts[0]): - raise RuntimeError("Node '{0}' is not in the Kubernetes " - "cluster".format(matching_hosts)) - - # Validate whether cluster CRD can accept individual OSD - # creations (i.e. not useAllDevices) - if not self.rook_cluster.can_create_osd(): - raise RuntimeError("Rook cluster configuration does not " - "support OSD creation.") - - return orchestrator.Completion.with_progress( - message="Creating OSD on {0}:{1}".format( - matching_hosts, - targets), - mgr=self, - on_complete=lambda _:self.rook_cluster.add_osds(drive_group, matching_hosts), - calc_percent=lambda: has_osds(matching_hosts) - ) + all_hosts = raise_if_exception(self.get_hosts()) - @deferred_read - def has_osds(matching_hosts): + matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts) + + assert len(matching_hosts) == 1 + + if not self.rook_cluster.node_exists(matching_hosts[0]): + raise RuntimeError("Node '{0}' is not in the Kubernetes " + "cluster".format(matching_hosts)) + + # Validate whether cluster CRD can accept individual OSD + # creations (i.e. not useAllDevices) + if not self.rook_cluster.can_create_osd(): + raise RuntimeError("Rook cluster configuration does not " + "support OSD creation.") + + return self.rook_cluster.add_osds(drive_group, matching_hosts) + + # TODO: this was the code to update the progress reference: + """ + @handle_orch_error + def has_osds(matching_hosts: List[str]) -> bool: # Find OSD pods on this host pod_osd_ids = set() pods = self.k8s.list_namespaced_pod(self._rook_env.namespace, - label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name), - field_selector="spec.nodeName={0}".format( - matching_hosts[0] - )).items + label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name), + field_selector="spec.nodeName={0}".format( + matching_hosts[0] + )).items for p in pods: pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id'])) @@ -574,15 +494,8 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): )) return found is not None + """ - c = self.get_hosts().then(execute) - return c - - def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> RookCompletion: - return write_completion( - on_complete=lambda: self.rook_cluster.blink_light( - ident_fault, on, locs), - message="Switching <{}> identification light in {}".format( - on, ",".join(["{}:{}".format(loc.host, loc.dev) for loc in locs])), - mgr=self - ) + @handle_orch_error + def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]: + return self.rook_cluster.blink_light(ident_fault, on, locs) diff --git a/src/pybind/mgr/rook/rook_cluster.py b/src/pybind/mgr/rook/rook_cluster.py index c842b3579cada..197e1318dd1a2 100644 --- a/src/pybind/mgr/rook/rook_cluster.py +++ b/src/pybind/mgr/rook/rook_cluster.py @@ -389,7 +389,7 @@ class RookCluster(object): else: raise - def apply_filesystem(self, spec: ServiceSpec) -> None: + def apply_filesystem(self, spec: ServiceSpec) -> str: # TODO use spec.placement # TODO warn if spec.extended has entries we don't kow how # to action. @@ -456,7 +456,7 @@ class RookCluster(object): cos.CephObjectStore, 'cephobjectstores', name, _update_zone, _create_zone) - def apply_nfsgw(self, spec: NFSServiceSpec) -> None: + def apply_nfsgw(self, spec: NFSServiceSpec) -> str: # TODO use spec.placement # TODO warn if spec.extended has entries we don't kow how # to action. -- 2.39.5