From 29aeebb40f1b45fd41884d2eff57658f7d89e9a7 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Fri, 19 Jul 2019 15:44:57 +0200 Subject: [PATCH] mgr/rook: Move KubernetesResource to rook_cluser.py Use `KubernetesResource` as cache for pods, inventory maps and nodes. This is required for the dashboard. Also: properly return K8s nodes fix minor issues Signed-off-by: Sebastian Wagner --- src/pybind/mgr/orchestrator.py | 6 +- src/pybind/mgr/rook/module.py | 196 ++------------------- src/pybind/mgr/rook/rook_cluster.py | 260 ++++++++++++++++++++-------- 3 files changed, 198 insertions(+), 264 deletions(-) diff --git a/src/pybind/mgr/orchestrator.py b/src/pybind/mgr/orchestrator.py index fe867f15601..ce79e840f14 100644 --- a/src/pybind/mgr/orchestrator.py +++ b/src/pybind/mgr/orchestrator.py @@ -123,7 +123,11 @@ def raise_if_exception(c): return my_obj if c.exception is not None: - raise copy_to_this_subinterpreter(c.exception) + try: + e = copy_to_this_subinterpreter(c.exception) + except (KeyError, AttributeError): + raise Exception(str(c.exception)) + raise e class ReadCompletion(_Completion): diff --git a/src/pybind/mgr/rook/module.py b/src/pybind/mgr/rook/module.py index 6cb90666377..c801e7db760 100644 --- a/src/pybind/mgr/rook/module.py +++ b/src/pybind/mgr/rook/module.py @@ -2,14 +2,15 @@ import threading import functools import os import uuid + try: - from typing import List + from typing import List, Dict from ceph.deployment.drive_group import DriveGroupSpec except ImportError: pass # just for type checking try: - from kubernetes import client, config, watch + from kubernetes import client, config from kubernetes.client.rest import ApiException kubernetes_imported = True @@ -87,7 +88,7 @@ class RookWriteCompletion(orchestrator.WriteCompletion): self.message = message - self.error = None + self.exception = None # XXX hacky global global all_completions @@ -108,10 +109,6 @@ class RookWriteCompletion(orchestrator.WriteCompletion): def is_effective(self): return self.effective - @property - def is_errored(self): - return self.error is not None - def execute(self): if not self.executed: self._result = self.execute_cb() @@ -138,161 +135,6 @@ def deferred_read(f): return wrapper -def threaded(f): - def wrapper(*args, **kwargs): - t = threading.Thread(target=f, args=args, kwargs=kwargs) - t.daemon = True - t.start() - return t - return wrapper - - -class KubernetesResource(object): - """ Generic kubernetes Resource parent class - - The api fetch and watch methods should be common across resource types, - so this class implements the basics, and child classes should implement - anything specific to the resource type - """ - - def __init__(self, api_name, method_name, log): - self.api_name = api_name - self.api = None - self.method_name = method_name - self.health = 'OK' # OK or error reason - self.raw_data = None - self.log = log - - @property - def valid(self): - """ Check that the api, and method are viable """ - - if not hasattr(client, self.api_name): - self.health = "[ERR] : API is invalid" - return False - try: - _api = getattr(client, self.api_name)() - except ApiException: - self.health = "[ERR] : API is inaccessible" - return False - else: - self.api = _api - if not hasattr(_api, self.method_name): - self.health = "[ERR] : {} is an anvalid method of {}".format(self.method_name, self.api_name) - return False - return True - - def fetch(self): - """ Execute the requested api method as a one-off fetch""" - if self.valid: - try: - response = getattr(self.api, self.method_name)() - except ApiException: - self.raw_data = None - self.health = "[ERR] : k8s API call failed against {}.{}".format(self.api_name, - self.method_name) - self.log.error(self.health) - else: - self.last = time.time() - self.raw_data = response - if hasattr(response, 'items'): - self.items.clear() - for item in response.items: - name = item.metadata.name - self.items[name] = item - self.health = "[ERR] : API {}.{} is invalid/inaccessible".format(self.api_name, - self.method_name) - self.log.error(self.health) - - - @property - def data(self): - """ Process raw_data into a consumable dict - Override in the child """ - return self.raw_data - - @property - def resource_version(self): - # metadata is a V1ListMeta object type - if hasattr(self.raw_data, 'metadata'): - return self.raw_data.metadata.resource_version - else: - return None - - @threaded - def watch(self): - """ Start a thread which will use the kubernetes watch client against a resource """ - self.fetch() - if self.api_ok: - self.log.info("[INF] : Attaching resource watcher for k8s " - "{}.{}".format(self.api_name, self.method_name)) - self.watcher = self._watch() - - @threaded - def _watch(self): - """ worker thread that runs the kubernetes watch """ - - res_ver = self.resource_version - self.log.info("[INF] Attaching resource watcher for k8s " - "{}/{}".format(self.api_name, self.method_name)) - w = watch.Watch() - func = getattr(self.api, self.method_name) - - try: - # execute generator to continually watch resource for changes - for item in w.stream(func, resource_version=res_ver, watch=True): - obj = item['object'] - try: - name = obj.metadata.name - except AttributeError: - name = None - self.log.warning("[WRN] {}.{} doesn't contain a metadata.name. " - "Unable to track changes".format(self.api_name, - self.method_name)) - - if item['type'] == 'ADDED': - if self.filter(obj): - if self.items and name: - self.items[name] = obj - - elif item['type'] == 'DELETED': - if self.filter(obj): - if self.items and name: - del self.items[name] - - except AttributeError as e: - self.health = "[ERR] : Unable to attach watcher - incompatible urllib3? ({})".format(e) - self.log.error(self.health) - - - -class StorageClass(KubernetesResource): - - def __init__(self, api_name='StorageV1Api', method_name='list_storage_class', log=None): - KubernetesResource.__init__(self, api_name, method_name, log) - - @property - def data(self): - """ provide a more readable/consumable version of the list_storage_class raw data """ - - pool_lookup = dict() - - # raw_data contains an items list of V1StorageClass objects. Each object contains metadata - # attribute which is a V1ObjectMeta object - for item in self.raw_data.items: - if not item.provisioner.startswith(('ceph.rook', 'cephfs.csi.ceph', "rbd.csi.ceph")): - continue - - sc_name = item.metadata.name - # pool used by ceph csi, blockPool by legacy storageclass definition - pool_name = item.parameters.get('pool', item.parameters.get('blockPool')) - if pool_name in pool_lookup: - pool_lookup[pool_name].append(sc_name) - else: - pool_lookup[pool_name] = list([sc_name]) - - return pool_lookup - - class RookEnv(object): def __init__(self): # POD_NAMESPACE already exist for Rook 0.9 @@ -301,7 +143,7 @@ class RookEnv(object): # ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0 self.cluster_name = os.environ.get('ROOK_CEPH_CLUSTER_CRD_NAME', self.namespace) - self.operator_namespace = os.environ.get('ROOK_OPERATOR_NAMESPACE', "rook-ceph-system") + self.operator_namespace = os.environ.get('ROOK_OPERATOR_NAMESPACE', self.namespace) self.crd_version = os.environ.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1') self.api_name = "ceph.rook.io/" + self.crd_version @@ -379,7 +221,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): self._k8s = None self._rook_cluster = None self._rook_env = RookEnv() - self.k8s_resource = dict() self._shutdown = threading.Event() @@ -397,24 +238,12 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): self._initialized.wait() return self._rook_cluster - def get_k8s_resource(self, resource_type=None, mode='readable'): - """ Return specific k8s resource data """ - - if resource_type in self.k8s_resource: - if mode == 'readable': - return self.k8s_resource[resource_type].data - else: - return self.k8s_resource[resource_type].raw_data - else: - self.log.warning("[WRN] request ignored for non-existent k8s resource - {}".format(resource_type)) - return None - def serve(self): # For deployed clusters, we should always be running inside # a Rook cluster. For development convenience, also support # running outside (reading ~/.kube config) - if self._rook_env.cluster_name: + if self._rook_env.has_namespace(): config.load_incluster_config() cluster_name = self._rook_env.cluster_name else: @@ -429,15 +258,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): self._k8s = client.CoreV1Api() - self.k8s_object['storageclass'] = StorageClass(log=self.log) - if self.k8s_object['storageclass'].valid: - self.log.info("Fetching available storageclass definitions") - self.k8s_object['storageclass'].watch() - else: - self.log.warning("[WRN] Unable to use k8s API - " - "{}/{}".format(self.k8s_object['storageclass'].api_name, - self.k8s_object['storageclass'].method_name)) - try: # XXX mystery hack -- I need to do an API call from # this context, or subsequent API usage from handle_command @@ -508,6 +328,10 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): return result + @deferred_read + def get_hosts(self): + return [orchestrator.InventoryNode(n, []) for n in self.rook_cluster.get_node_names()] + @deferred_read def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False): diff --git a/src/pybind/mgr/rook/rook_cluster.py b/src/pybind/mgr/rook/rook_cluster.py index ef4040754e3..3ffbecd5271 100644 --- a/src/pybind/mgr/rook/rook_cluster.py +++ b/src/pybind/mgr/rook/rook_cluster.py @@ -6,6 +6,7 @@ call methods. This module is runnable outside of ceph-mgr, useful for testing. """ +import threading import logging import json from contextlib import contextmanager @@ -14,23 +15,29 @@ from six.moves.urllib.parse import urljoin # pylint: disable=import-error # Optional kubernetes imports to enable MgrModule.can_run # to behave cleanly. +from mgr_util import merge_dicts + try: from kubernetes.client.rest import ApiException + from kubernetes.client import V1ListMeta, CoreV1Api, V1Pod + from kubernetes import watch except ImportError: - ApiException = None + class ApiException(Exception): pass + + +import orchestrator + try: - import orchestrator from rook.module import RookEnv - from typing import List + from typing import List, Dict except ImportError: pass # just used for type checking. - log = logging.getLogger(__name__) -class ApplyException(Exception): +class ApplyException(orchestrator.OrchestratorError): """ For failures to update the Rook CRDs, usually indicating some kind of interference between our attempted update @@ -38,10 +45,128 @@ class ApplyException(Exception): """ +def threaded(f): + def wrapper(*args, **kwargs): + t = threading.Thread(target=f, args=args, kwargs=kwargs) + t.start() + return t + + return wrapper + + +class KubernetesResource(object): + def __init__(self, api_func, **kwargs): + """ + Generic kubernetes Resource parent class + + The api fetch and watch methods should be common across resource types, + + Exceptions in the runner thread are propagated to the caller. + + :param api_func: kubernetes client api function that is passed to the watcher + :param filter_func: signature: ``(Item) -> bool``. + """ + self.kwargs = kwargs + self.api_func = api_func + + # ``_items`` is accessed by different threads. I assume assignment is atomic. + self._items = dict() + self.thread = None # type: threading.Thread + self.exception = None + + def _fetch(self): + """ Execute the requested api method as a one-off fetch""" + response = self.api_func(**self.kwargs) + # metadata is a V1ListMeta object type + metadata = response.metadata # type: V1ListMeta + self._items = {item.metadata.name: item for item in response.items} + log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items))) + return metadata.resource_version + + @property + def items(self): + """ + Returns the items of the request. + Creates the watcher as a side effect. + :return: + """ + if self.exception: + e = self.exception + self.exception = None + raise e # Propagate the exception to the user. + if not self.thread or not self.thread.is_alive(): + # Start a thread which will use the kubernetes watch client against a resource + resource_version = self._fetch() + log.debug("Attaching resource watcher for k8s {}".format(self.api_func)) + self.thread = self._watch(resource_version) # type: threading.Thread + + return self._items.values() + + @threaded + def _watch(self, res_ver): + """ worker thread that runs the kubernetes watch """ + + self.exception = None + + w = watch.Watch() + + try: + # execute generator to continually watch resource for changes + for event in w.stream(self.api_func, resource_version=res_ver, watch=True, + **self.kwargs): + self.health = '' + item = event['object'] + try: + name = item.metadata.name + except AttributeError: + raise AttributeError( + "{} doesn't contain a metadata.name. Unable to track changes".format( + self.api_func)) + + log.info('{} event: {}'.format(event['type'], name)) + + if event['type'] in ('ADDED', 'MODIFIED'): + self._items = merge_dicts(self._items, {name: item}) + elif event['type'] == 'DELETED': + self._items = {k:v for k,v in self._items.items() if k != name} + elif event['type'] == 'BOOKMARK': + pass + elif event['type'] == 'ERROR': + raise ApiException(str(event)) + else: + raise KeyError('Unknown watch event {}'.format(event['type'])) + + except ApiException as e: + log.exception('K8s API failed. {}'.format(self.api_func)) + self.exception = e + raise + except AttributeError as e: + log.exception( + "Unable to attach watcher - incompatible urllib3? ({})".format(self.api_func)) + self.exception = e + raise + except Exception as e: + log.exception("Watcher failed. ({})".format(self.api_func)) + self.exception = e + raise + + class RookCluster(object): def __init__(self, k8s, rook_env): self.rook_env = rook_env # type: RookEnv - self.k8s = k8s + self.k8s = k8s # type: CoreV1Api + + # TODO: replace direct k8s calls with Rook API calls + # when they're implemented + self.inventory_maps = KubernetesResource(self.k8s.list_namespaced_config_map, + namespace=self.rook_env.operator_namespace, + label_selector="app=rook-discover") + + self.rook_pods = KubernetesResource(self.k8s.list_namespaced_pod, + namespace=self.rook_env.namespace, + label_selector="rook_cluster={0}".format( + self.rook_env.cluster_name)) + self.nodes = KubernetesResource(self.k8s.list_node) def rook_url(self, path): prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % ( @@ -76,25 +201,20 @@ class RookCluster(object): return self.rook_api_call("POST", path, **kwargs) def get_discovered_devices(self, nodenames=None): - # TODO: replace direct k8s calls with Rook API calls - # when they're implemented - label_selector = "app=rook-discover" - if nodenames is not None: - # FIXME: is there a practical or official limit on the - # number of entries in a label selector - label_selector += ", rook.io/node in ({0})".format( - ", ".join(nodenames)) + def predicate(item): + if nodenames is not None: + return item.metadata.labels['rook.io/node'] in nodenames + else: + return True try: - result = self.k8s.list_namespaced_config_map( - self.rook_env.operator_namespace, - label_selector=label_selector) + result = [i for i in self.inventory_maps.items if predicate(i)] except ApiException as e: - log.exception("Failed to fetch device metadata: {0}".format(e)) + log.exception("Failed to fetch device metadata") raise nodename_to_devices = {} - for i in result.items: + for i in result: drives = json.loads(i.data['devices']) nodename_to_devices[i.metadata.labels['rook.io/node']] = drives @@ -121,72 +241,66 @@ class RookCluster(object): url = "rados://{0}/{1}/conf-{2}.{3}".format(pool, namespace, nfs_cluster, instance) return url - def describe_pods(self, service_type, service_id, nodename): - # Go query the k8s API about deployment, containers related to this - # filesystem - - # Inspect the Rook YAML, to decide whether this filesystem - # is Ceph-managed or Rook-managed - # TODO: extend Orchestrator interface to describe whether FS - # is manageable by us or not - - # Example Rook Pod labels for a mgr daemon: - # Labels: app=rook-ceph-mgr - # pod-template-hash=2171958073 - # rook_cluster=rook - # And MDS containers additionally have `rook_filesystem` label - - # Label filter is rook_cluster= - # rook_file_system= - - label_filter = "rook_cluster={0}".format(self.rook_env.cluster_name) - if service_type != None: - label_filter += ",app=rook-ceph-{0}".format(service_type) - if service_id != None: - if service_type == "mds": - label_filter += ",rook_file_system={0}".format(service_id) - elif service_type == "osd": - # Label added in https://github.com/rook/rook/pull/1698 - label_filter += ",ceph-osd-id={0}".format(service_id) - elif service_type == "mon": - # label like mon=rook-ceph-mon0 - label_filter += ",mon={0}".format(service_id) - elif service_type == "mgr": - label_filter += ",mgr={0}".format(service_id) - elif service_type == "nfs": - label_filter += ",ceph_nfs={0}".format(service_id) - elif service_type == "rgw": - # TODO: rgw - pass + """ + Go query the k8s API about deployment, containers related to this + filesystem - field_filter = "" - if nodename != None: - field_filter = "spec.nodeName={0}".format(nodename) + Example Rook Pod labels for a mgr daemon: + Labels: app=rook-ceph-mgr + pod-template-hash=2171958073 + rook_cluster=rook + And MDS containers additionally have `rook_filesystem` label - pods = self.k8s.list_namespaced_pod( - self.rook_env.namespace, - label_selector=label_filter, - field_selector=field_filter) + Label filter is rook_cluster= + rook_file_system= + """ + def predicate(item): + # type: (V1Pod) -> bool + metadata = item.metadata + if service_type is not None: + if metadata.labels['app'] != "rook-ceph-{0}".format(service_type): + return False + + if service_id is not None: + try: + k, v = { + "mds": ("rook_file_system", service_id), + "osd": ("ceph-osd-id", service_id), + "mon": ("mon", service_id), + "mgr": ("mgr", service_id), + "ceph_nfs": ("ceph_nfs", service_id), + "rgw": ("ceph_rgw", service_id), + }[service_type] + except KeyError: + raise orchestrator.OrchestratorValidationError( + '{} not supported'.format(service_type)) + if metadata.labels[k] != v: + return False + + if nodename is not None: + if item.spec.node_name != nodename: + return False + return True - # import json - # print json.dumps(pods.items[0]) + pods = [i for i in self.rook_pods.items if predicate(i)] pods_summary = [] - for p in pods.items: + for p in pods: d = p.to_dict() # p['metadata']['creationTimestamp'] - # p['metadata']['nodeName'] pods_summary.append({ "name": d['metadata']['name'], "nodename": d['spec']['node_name'], "labels": d['metadata']['labels'] }) - pass return pods_summary + def get_node_names(self): + return [i.metadata.name for i in self.nodes.items] + @contextmanager def ignore_409(self, what): try: @@ -315,15 +429,7 @@ class RookCluster(object): return not use_all_nodes def node_exists(self, node_name): - try: - self.k8s.read_node(node_name, exact=False, export=True) - except ApiException as e: - if e.status == 404: - return False - else: - raise - else: - return True + return node_name in self.get_node_names() def update_mon_count(self, newcount): patch = [{"op": "replace", "path": "/spec/mon/count", "value": newcount}] -- 2.39.5