]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/rook: Move KubernetesResource to rook_cluser.py
authorSebastian Wagner <sebastian.wagner@suse.com>
Fri, 19 Jul 2019 13:44:57 +0000 (15:44 +0200)
committerSebastian Wagner <sebastian.wagner@suse.com>
Wed, 31 Jul 2019 07:59:36 +0000 (09:59 +0200)
Use `KubernetesResource` as cache for pods, inventory maps and nodes.
This is required for the dashboard.

Also:
  properly return K8s nodes
  fix minor issues

Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
src/pybind/mgr/orchestrator.py
src/pybind/mgr/rook/module.py
src/pybind/mgr/rook/rook_cluster.py

index fe867f15601f37c007fc5ecc5d1ad5062f7a5d92..ce79e840f14971db973274dc6cd5cd70f4663ca8 100644 (file)
@@ -123,7 +123,11 @@ def raise_if_exception(c):
         return my_obj
 
     if c.exception is not None:
-        raise copy_to_this_subinterpreter(c.exception)
+        try:
+            e = copy_to_this_subinterpreter(c.exception)
+        except (KeyError, AttributeError):
+            raise Exception(str(c.exception))
+        raise e
 
 
 class ReadCompletion(_Completion):
index 6cb906663772c7fbe33fa218363d5a9cab9f9861..c801e7db7606cbbb8c970f340c2816d94e666c5a 100644 (file)
@@ -2,14 +2,15 @@ import threading
 import functools
 import os
 import uuid
+
 try:
-    from typing import List
+    from typing import List, Dict
     from ceph.deployment.drive_group import DriveGroupSpec
 except ImportError:
     pass  # just for type checking
 
 try:
-    from kubernetes import client, config, watch
+    from kubernetes import client, config
     from kubernetes.client.rest import ApiException
 
     kubernetes_imported = True
@@ -87,7 +88,7 @@ class RookWriteCompletion(orchestrator.WriteCompletion):
 
         self.message = message
 
-        self.error = None
+        self.exception = None
 
         # XXX hacky global
         global all_completions
@@ -108,10 +109,6 @@ class RookWriteCompletion(orchestrator.WriteCompletion):
     def is_effective(self):
         return self.effective
 
-    @property
-    def is_errored(self):
-        return self.error is not None
-
     def execute(self):
         if not self.executed:
             self._result = self.execute_cb()
@@ -138,161 +135,6 @@ def deferred_read(f):
     return wrapper
 
 
-def threaded(f):
-    def wrapper(*args, **kwargs):
-        t = threading.Thread(target=f, args=args, kwargs=kwargs)
-        t.daemon = True
-        t.start()
-        return t
-    return wrapper
-
-
-class KubernetesResource(object):
-    """ Generic kubernetes Resource parent class
-
-    The api fetch and watch methods should be common across resource types,
-    so this class implements the basics, and child classes should implement 
-    anything specific to the resource type
-    """
-
-    def __init__(self, api_name, method_name, log):
-        self.api_name = api_name
-        self.api = None
-        self.method_name = method_name
-        self.health = 'OK'                  # OK or error reason
-        self.raw_data = None
-        self.log = log
-
-    @property
-    def valid(self):
-        """ Check that the api, and method are viable """
-
-        if not hasattr(client, self.api_name):
-            self.health = "[ERR] : API is invalid"
-            return False
-        try:
-            _api = getattr(client, self.api_name)()
-        except ApiException:
-            self.health = "[ERR] : API is inaccessible"
-            return False
-        else:
-            self.api = _api
-            if not hasattr(_api, self.method_name):
-                self.health = "[ERR] : {} is an anvalid method of {}".format(self.method_name, self.api_name)
-                return False
-        return True
-
-    def fetch(self):
-        """ Execute the requested api method as a one-off fetch"""
-        if self.valid:
-            try:
-                response = getattr(self.api, self.method_name)()
-            except ApiException:
-                self.raw_data = None
-                self.health = "[ERR] : k8s API call failed against {}.{}".format(self.api_name,
-                                                                                 self.method_name)
-                self.log.error(self.health) 
-            else:
-                self.last = time.time()
-                self.raw_data = response
-                if hasattr(response, 'items'):
-                    self.items.clear()
-                    for item in response.items:
-                        name = item.metadata.name
-                        self.items[name] = item
-            self.health = "[ERR] : API {}.{} is invalid/inaccessible".format(self.api_name,
-                                                                             self.method_name)
-            self.log.error(self.health)
-
-
-    @property
-    def data(self):
-        """ Process raw_data into a consumable dict - Override in the child """
-        return self.raw_data
-
-    @property
-    def resource_version(self):
-        # metadata is a V1ListMeta object type
-        if hasattr(self.raw_data, 'metadata'):
-            return self.raw_data.metadata.resource_version
-        else:
-            return None
-
-    @threaded
-    def watch(self):
-        """ Start a thread which will use the kubernetes watch client against a resource """
-        self.fetch()
-        if self.api_ok:
-            self.log.info("[INF] : Attaching resource watcher for k8s "
-                          "{}.{}".format(self.api_name, self.method_name))
-            self.watcher = self._watch()
-
-    @threaded
-    def _watch(self):
-        """ worker thread that runs the kubernetes watch """
-
-        res_ver = self.resource_version
-            self.log.info("[INF] Attaching resource watcher for k8s "
-                          "{}/{}".format(self.api_name, self.method_name))
-        w = watch.Watch()
-        func = getattr(self.api, self.method_name)
-
-        try:
-            # execute generator to continually watch resource for changes
-            for item in w.stream(func, resource_version=res_ver, watch=True):
-                    obj = item['object']
-                    try:
-                        name = obj.metadata.name
-                    except AttributeError:
-                        name = None
-                        self.log.warning("[WRN] {}.{} doesn't contain a metadata.name. "
-                                         "Unable to track changes".format(self.api_name,
-                                                                          self.method_name))
-
-                if item['type'] == 'ADDED':
-                        if self.filter(obj):
-                            if self.items and name:
-                                self.items[name] = obj
-
-                elif item['type'] == 'DELETED':
-                        if self.filter(obj):
-                            if self.items and name:
-                                del self.items[name]
-
-        except AttributeError as e:
-            self.health = "[ERR] : Unable to attach watcher - incompatible urllib3? ({})".format(e)
-            self.log.error(self.health)
-
-       
-
-class StorageClass(KubernetesResource):
-
-    def __init__(self, api_name='StorageV1Api', method_name='list_storage_class', log=None):
-        KubernetesResource.__init__(self, api_name, method_name, log)
-    
-    @property
-    def data(self):
-        """ provide a more readable/consumable version of the list_storage_class raw data """
-
-        pool_lookup = dict()
-
-        # raw_data contains an items list of V1StorageClass objects. Each object contains metadata
-        # attribute which is a V1ObjectMeta object
-        for item in self.raw_data.items:
-            if not item.provisioner.startswith(('ceph.rook', 'cephfs.csi.ceph', "rbd.csi.ceph")):
-                continue
-            
-            sc_name = item.metadata.name
-            # pool used by ceph csi, blockPool by legacy storageclass definition
-            pool_name = item.parameters.get('pool', item.parameters.get('blockPool'))
-            if pool_name in pool_lookup:
-                    pool_lookup[pool_name].append(sc_name)
-            else:
-                pool_lookup[pool_name] = list([sc_name])
-
-        return pool_lookup
-
-
 class RookEnv(object):
     def __init__(self):
         # POD_NAMESPACE already exist for Rook 0.9
@@ -301,7 +143,7 @@ class RookEnv(object):
         # ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0
         self.cluster_name = os.environ.get('ROOK_CEPH_CLUSTER_CRD_NAME', self.namespace)
 
-        self.operator_namespace = os.environ.get('ROOK_OPERATOR_NAMESPACE', "rook-ceph-system")
+        self.operator_namespace = os.environ.get('ROOK_OPERATOR_NAMESPACE', self.namespace)
         self.crd_version = os.environ.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
         self.api_name = "ceph.rook.io/" + self.crd_version
 
@@ -379,7 +221,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         self._k8s = None
         self._rook_cluster = None
         self._rook_env = RookEnv()
-        self.k8s_resource = dict()
 
         self._shutdown = threading.Event()
 
@@ -397,24 +238,12 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         self._initialized.wait()
         return self._rook_cluster
 
-    def get_k8s_resource(self, resource_type=None, mode='readable'):
-        """ Return specific k8s resource data """
-
-        if resource_type in self.k8s_resource:
-            if mode == 'readable':
-                return self.k8s_resource[resource_type].data
-            else:
-                return self.k8s_resource[resource_type].raw_data
-        else:
-            self.log.warning("[WRN] request ignored for non-existent k8s resource - {}".format(resource_type))
-            return None
-
     def serve(self):
         # For deployed clusters, we should always be running inside
         # a Rook cluster.  For development convenience, also support
         # running outside (reading ~/.kube config)
 
-        if self._rook_env.cluster_name:
+        if self._rook_env.has_namespace():
             config.load_incluster_config()
             cluster_name = self._rook_env.cluster_name
         else:
@@ -429,15 +258,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         self._k8s = client.CoreV1Api()
 
-        self.k8s_object['storageclass'] = StorageClass(log=self.log)
-        if self.k8s_object['storageclass'].valid:
-            self.log.info("Fetching available storageclass definitions")
-            self.k8s_object['storageclass'].watch()
-        else:
-            self.log.warning("[WRN] Unable to use k8s API - "
-                           "{}/{}".format(self.k8s_object['storageclass'].api_name, 
-                                          self.k8s_object['storageclass'].method_name))
-
         try:
             # XXX mystery hack -- I need to do an API call from
             # this context, or subsequent API usage from handle_command
@@ -508,6 +328,10 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         return result
 
+    @deferred_read
+    def get_hosts(self):
+        return [orchestrator.InventoryNode(n, []) for n in self.rook_cluster.get_node_names()]
+
     @deferred_read
     def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False):
 
index ef4040754e3de85a0f04fcfd71310cdfeba2f7e9..3ffbecd5271259cbfc5e4a6a1fa8e8e62578c3cc 100644 (file)
@@ -6,6 +6,7 @@ call methods.
 
 This module is runnable outside of ceph-mgr, useful for testing.
 """
+import threading
 import logging
 import json
 from contextlib import contextmanager
@@ -14,23 +15,29 @@ from six.moves.urllib.parse import urljoin  # pylint: disable=import-error
 
 # Optional kubernetes imports to enable MgrModule.can_run
 # to behave cleanly.
+from mgr_util import merge_dicts
+
 try:
     from kubernetes.client.rest import ApiException
+    from kubernetes.client import V1ListMeta, CoreV1Api, V1Pod
+    from kubernetes import watch
 except ImportError:
-    ApiException = None
+    class ApiException(Exception): pass
+
+
+import orchestrator
+
 
 try:
-    import orchestrator
     from rook.module import RookEnv
-    from typing import List
+    from typing import List, Dict
 except ImportError:
     pass  # just used for type checking.
 
-
 log = logging.getLogger(__name__)
 
 
-class ApplyException(Exception):
+class ApplyException(orchestrator.OrchestratorError):
     """
     For failures to update the Rook CRDs, usually indicating
     some kind of interference between our attempted update
@@ -38,10 +45,128 @@ class ApplyException(Exception):
     """
 
 
+def threaded(f):
+    def wrapper(*args, **kwargs):
+        t = threading.Thread(target=f, args=args, kwargs=kwargs)
+        t.start()
+        return t
+
+    return wrapper
+
+
+class KubernetesResource(object):
+    def __init__(self, api_func, **kwargs):
+        """
+        Generic kubernetes Resource parent class
+
+        The api fetch and watch methods should be common across resource types,
+
+        Exceptions in the runner thread are propagated to the caller.
+
+        :param api_func: kubernetes client api function that is passed to the watcher
+        :param filter_func: signature: ``(Item) -> bool``.
+        """
+        self.kwargs = kwargs
+        self.api_func = api_func
+
+        # ``_items`` is accessed by different threads. I assume assignment is atomic.
+        self._items = dict()
+        self.thread = None  # type: threading.Thread
+        self.exception = None
+
+    def _fetch(self):
+        """ Execute the requested api method as a one-off fetch"""
+        response = self.api_func(**self.kwargs)
+        # metadata is a V1ListMeta object type
+        metadata = response.metadata  # type: V1ListMeta
+        self._items = {item.metadata.name: item for item in response.items}
+        log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
+        return metadata.resource_version
+
+    @property
+    def items(self):
+        """
+        Returns the items of the request.
+        Creates the watcher as a side effect.
+        :return:
+        """
+        if self.exception:
+            e = self.exception
+            self.exception = None
+            raise e  # Propagate the exception to the user.
+        if not self.thread or not self.thread.is_alive():
+            # Start a thread which will use the kubernetes watch client against a resource
+            resource_version = self._fetch()
+            log.debug("Attaching resource watcher for k8s {}".format(self.api_func))
+            self.thread = self._watch(resource_version)  # type: threading.Thread
+
+        return self._items.values()
+
+    @threaded
+    def _watch(self, res_ver):
+        """ worker thread that runs the kubernetes watch """
+
+        self.exception = None
+
+        w = watch.Watch()
+
+        try:
+            # execute generator to continually watch resource for changes
+            for event in w.stream(self.api_func, resource_version=res_ver, watch=True,
+                                  **self.kwargs):
+                self.health = ''
+                item = event['object']
+                try:
+                    name = item.metadata.name
+                except AttributeError:
+                    raise AttributeError(
+                        "{} doesn't contain a metadata.name. Unable to track changes".format(
+                            self.api_func))
+
+                log.info('{} event: {}'.format(event['type'], name))
+
+                if event['type'] in ('ADDED', 'MODIFIED'):
+                    self._items = merge_dicts(self._items, {name: item})
+                elif event['type'] == 'DELETED':
+                    self._items = {k:v for k,v in self._items.items() if k != name}
+                elif event['type'] == 'BOOKMARK':
+                    pass
+                elif event['type'] == 'ERROR':
+                    raise ApiException(str(event))
+                else:
+                    raise KeyError('Unknown watch event {}'.format(event['type']))
+
+        except ApiException as e:
+            log.exception('K8s API failed. {}'.format(self.api_func))
+            self.exception = e
+            raise
+        except AttributeError as e:
+            log.exception(
+                "Unable to attach watcher - incompatible urllib3? ({})".format(self.api_func))
+            self.exception = e
+            raise
+        except Exception as e:
+            log.exception("Watcher failed. ({})".format(self.api_func))
+            self.exception = e
+            raise
+
+
 class RookCluster(object):
     def __init__(self, k8s, rook_env):
         self.rook_env = rook_env  # type: RookEnv
-        self.k8s = k8s
+        self.k8s = k8s  # type: CoreV1Api
+
+        #  TODO: replace direct k8s calls with Rook API calls
+        # when they're implemented
+        self.inventory_maps = KubernetesResource(self.k8s.list_namespaced_config_map,
+                                                 namespace=self.rook_env.operator_namespace,
+                                                 label_selector="app=rook-discover")
+
+        self.rook_pods = KubernetesResource(self.k8s.list_namespaced_pod,
+                                            namespace=self.rook_env.namespace,
+                                            label_selector="rook_cluster={0}".format(
+                                                self.rook_env.cluster_name))
+        self.nodes = KubernetesResource(self.k8s.list_node)
 
     def rook_url(self, path):
         prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
@@ -76,25 +201,20 @@ class RookCluster(object):
         return self.rook_api_call("POST", path, **kwargs)
 
     def get_discovered_devices(self, nodenames=None):
-        # TODO: replace direct k8s calls with Rook API calls
-        # when they're implemented
-        label_selector = "app=rook-discover"
-        if nodenames is not None:
-            # FIXME: is there a practical or official limit on the
-            # number of entries in a label selector
-            label_selector += ", rook.io/node in ({0})".format(
-                ", ".join(nodenames))
+        def predicate(item):
+            if nodenames is not None:
+                return item.metadata.labels['rook.io/node'] in nodenames
+            else:
+                return True
 
         try:
-            result = self.k8s.list_namespaced_config_map(
-                self.rook_env.operator_namespace,
-                label_selector=label_selector)
+            result = [i for i in self.inventory_maps.items if predicate(i)]
         except ApiException as e:
-            log.exception("Failed to fetch device metadata: {0}".format(e))
+            log.exception("Failed to fetch device metadata")
             raise
 
         nodename_to_devices = {}
-        for i in result.items:
+        for i in result:
             drives = json.loads(i.data['devices'])
             nodename_to_devices[i.metadata.labels['rook.io/node']] = drives
 
@@ -121,72 +241,66 @@ class RookCluster(object):
             url = "rados://{0}/{1}/conf-{2}.{3}".format(pool, namespace, nfs_cluster, instance)
         return url
 
-
     def describe_pods(self, service_type, service_id, nodename):
-        # Go query the k8s API about deployment, containers related to this
-        # filesystem
-
-        # Inspect the Rook YAML, to decide whether this filesystem
-        # is Ceph-managed or Rook-managed
-        # TODO: extend Orchestrator interface to describe whether FS
-        # is manageable by us or not
-
-        # Example Rook Pod labels for a mgr daemon:
-        # Labels:         app=rook-ceph-mgr
-        #                 pod-template-hash=2171958073
-        #                 rook_cluster=rook
-        # And MDS containers additionally have `rook_filesystem` label
-
-        # Label filter is rook_cluster=<cluster name>
-        #                 rook_file_system=<self.fs_name>
-
-        label_filter = "rook_cluster={0}".format(self.rook_env.cluster_name)
-        if service_type != None:
-            label_filter += ",app=rook-ceph-{0}".format(service_type)
-            if service_id != None:
-                if service_type == "mds":
-                    label_filter += ",rook_file_system={0}".format(service_id)
-                elif service_type == "osd":
-                    # Label added in https://github.com/rook/rook/pull/1698
-                    label_filter += ",ceph-osd-id={0}".format(service_id)
-                elif service_type == "mon":
-                    # label like mon=rook-ceph-mon0
-                    label_filter += ",mon={0}".format(service_id)
-                elif service_type == "mgr":
-                    label_filter += ",mgr={0}".format(service_id)
-                elif service_type == "nfs":
-                    label_filter += ",ceph_nfs={0}".format(service_id)
-                elif service_type == "rgw":
-                    # TODO: rgw
-                    pass
+        """
+        Go query the k8s API about deployment, containers related to this
+        filesystem
 
-        field_filter = ""
-        if nodename != None:
-            field_filter = "spec.nodeName={0}".format(nodename)
+        Example Rook Pod labels for a mgr daemon:
+        Labels:         app=rook-ceph-mgr
+                        pod-template-hash=2171958073
+                        rook_cluster=rook
+        And MDS containers additionally have `rook_filesystem` label
 
-        pods = self.k8s.list_namespaced_pod(
-            self.rook_env.namespace,
-            label_selector=label_filter,
-            field_selector=field_filter)
+        Label filter is rook_cluster=<cluster name>
+                        rook_file_system=<self.fs_name>
+        """
+        def predicate(item):
+            # type: (V1Pod) -> bool
+            metadata = item.metadata
+            if service_type is not None:
+                if metadata.labels['app'] != "rook-ceph-{0}".format(service_type):
+                    return False
+
+                if service_id is not None:
+                    try:
+                        k, v = {
+                            "mds": ("rook_file_system", service_id),
+                            "osd": ("ceph-osd-id", service_id),
+                            "mon": ("mon", service_id),
+                            "mgr": ("mgr", service_id),
+                            "ceph_nfs": ("ceph_nfs", service_id),
+                            "rgw": ("ceph_rgw", service_id),
+                        }[service_type]
+                    except KeyError:
+                        raise orchestrator.OrchestratorValidationError(
+                            '{} not supported'.format(service_type))
+                    if metadata.labels[k] != v:
+                        return False
+
+            if nodename is not None:
+                if item.spec.node_name != nodename:
+                    return False
+            return True
 
-        # import json
-        # print json.dumps(pods.items[0])
+        pods = [i for i in self.rook_pods.items if predicate(i)]
 
         pods_summary = []
 
-        for p in pods.items:
+        for p in pods:
             d = p.to_dict()
             # p['metadata']['creationTimestamp']
-            # p['metadata']['nodeName']
             pods_summary.append({
                 "name": d['metadata']['name'],
                 "nodename": d['spec']['node_name'],
                 "labels": d['metadata']['labels']
             })
-            pass
 
         return pods_summary
 
+    def get_node_names(self):
+        return [i.metadata.name for i in self.nodes.items]
+
     @contextmanager
     def ignore_409(self, what):
         try:
@@ -315,15 +429,7 @@ class RookCluster(object):
         return not use_all_nodes
 
     def node_exists(self, node_name):
-        try:
-            self.k8s.read_node(node_name, exact=False, export=True)
-        except ApiException as e:
-            if e.status == 404:
-                return False
-            else:
-                raise
-        else:
-            return True
+        return node_name in self.get_node_names()
 
     def update_mon_count(self, newcount):
         patch = [{"op": "replace", "path": "/spec/mon/count", "value": newcount}]