]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/rook: Add feature to gather k8s storageclass information
authorPaul Cuzner <pcuzner@redhat.com>
Tue, 18 Jun 2019 03:58:17 +0000 (15:58 +1200)
committerSebastian Wagner <sebastian.wagner@suse.com>
Wed, 31 Jul 2019 07:57:33 +0000 (09:57 +0200)
Provide ability to fetch or watch k8s resources,
and hold the resulting objects in a dict. A method
has been added to the rook module to allow other
mgr modules to fetch the state of specific k8s
resources.

Signed-off-by: Paul Cuzner <pcuzner@redhat.com>
src/pybind/mgr/rook/module.py

index 51c9bd1e1d164a809cccf09e14958c1e3302190e..6cb906663772c7fbe33fa218363d5a9cab9f9861 100644 (file)
@@ -9,7 +9,7 @@ except ImportError:
     pass  # just for type checking
 
 try:
-    from kubernetes import client, config
+    from kubernetes import client, config, watch
     from kubernetes.client.rest import ApiException
 
     kubernetes_imported = True
@@ -138,6 +138,161 @@ def deferred_read(f):
     return wrapper
 
 
+def threaded(f):
+    def wrapper(*args, **kwargs):
+        t = threading.Thread(target=f, args=args, kwargs=kwargs)
+        t.daemon = True
+        t.start()
+        return t
+    return wrapper
+
+
+class KubernetesResource(object):
+    """ Generic kubernetes Resource parent class
+
+    The api fetch and watch methods should be common across resource types,
+    so this class implements the basics, and child classes should implement 
+    anything specific to the resource type
+    """
+
+    def __init__(self, api_name, method_name, log):
+        self.api_name = api_name
+        self.api = None
+        self.method_name = method_name
+        self.health = 'OK'                  # OK or error reason
+        self.raw_data = None
+        self.log = log
+
+    @property
+    def valid(self):
+        """ Check that the api, and method are viable """
+
+        if not hasattr(client, self.api_name):
+            self.health = "[ERR] : API is invalid"
+            return False
+        try:
+            _api = getattr(client, self.api_name)()
+        except ApiException:
+            self.health = "[ERR] : API is inaccessible"
+            return False
+        else:
+            self.api = _api
+            if not hasattr(_api, self.method_name):
+                self.health = "[ERR] : {} is an anvalid method of {}".format(self.method_name, self.api_name)
+                return False
+        return True
+
+    def fetch(self):
+        """ Execute the requested api method as a one-off fetch"""
+        if self.valid:
+            try:
+                response = getattr(self.api, self.method_name)()
+            except ApiException:
+                self.raw_data = None
+                self.health = "[ERR] : k8s API call failed against {}.{}".format(self.api_name,
+                                                                                 self.method_name)
+                self.log.error(self.health) 
+            else:
+                self.last = time.time()
+                self.raw_data = response
+                if hasattr(response, 'items'):
+                    self.items.clear()
+                    for item in response.items:
+                        name = item.metadata.name
+                        self.items[name] = item
+            self.health = "[ERR] : API {}.{} is invalid/inaccessible".format(self.api_name,
+                                                                             self.method_name)
+            self.log.error(self.health)
+
+
+    @property
+    def data(self):
+        """ Process raw_data into a consumable dict - Override in the child """
+        return self.raw_data
+
+    @property
+    def resource_version(self):
+        # metadata is a V1ListMeta object type
+        if hasattr(self.raw_data, 'metadata'):
+            return self.raw_data.metadata.resource_version
+        else:
+            return None
+
+    @threaded
+    def watch(self):
+        """ Start a thread which will use the kubernetes watch client against a resource """
+        self.fetch()
+        if self.api_ok:
+            self.log.info("[INF] : Attaching resource watcher for k8s "
+                          "{}.{}".format(self.api_name, self.method_name))
+            self.watcher = self._watch()
+
+    @threaded
+    def _watch(self):
+        """ worker thread that runs the kubernetes watch """
+
+        res_ver = self.resource_version
+            self.log.info("[INF] Attaching resource watcher for k8s "
+                          "{}/{}".format(self.api_name, self.method_name))
+        w = watch.Watch()
+        func = getattr(self.api, self.method_name)
+
+        try:
+            # execute generator to continually watch resource for changes
+            for item in w.stream(func, resource_version=res_ver, watch=True):
+                    obj = item['object']
+                    try:
+                        name = obj.metadata.name
+                    except AttributeError:
+                        name = None
+                        self.log.warning("[WRN] {}.{} doesn't contain a metadata.name. "
+                                         "Unable to track changes".format(self.api_name,
+                                                                          self.method_name))
+
+                if item['type'] == 'ADDED':
+                        if self.filter(obj):
+                            if self.items and name:
+                                self.items[name] = obj
+
+                elif item['type'] == 'DELETED':
+                        if self.filter(obj):
+                            if self.items and name:
+                                del self.items[name]
+
+        except AttributeError as e:
+            self.health = "[ERR] : Unable to attach watcher - incompatible urllib3? ({})".format(e)
+            self.log.error(self.health)
+
+       
+
+class StorageClass(KubernetesResource):
+
+    def __init__(self, api_name='StorageV1Api', method_name='list_storage_class', log=None):
+        KubernetesResource.__init__(self, api_name, method_name, log)
+    
+    @property
+    def data(self):
+        """ provide a more readable/consumable version of the list_storage_class raw data """
+
+        pool_lookup = dict()
+
+        # raw_data contains an items list of V1StorageClass objects. Each object contains metadata
+        # attribute which is a V1ObjectMeta object
+        for item in self.raw_data.items:
+            if not item.provisioner.startswith(('ceph.rook', 'cephfs.csi.ceph', "rbd.csi.ceph")):
+                continue
+            
+            sc_name = item.metadata.name
+            # pool used by ceph csi, blockPool by legacy storageclass definition
+            pool_name = item.parameters.get('pool', item.parameters.get('blockPool'))
+            if pool_name in pool_lookup:
+                    pool_lookup[pool_name].append(sc_name)
+            else:
+                pool_lookup[pool_name] = list([sc_name])
+
+        return pool_lookup
+
+
 class RookEnv(object):
     def __init__(self):
         # POD_NAMESPACE already exist for Rook 0.9
@@ -224,6 +379,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         self._k8s = None
         self._rook_cluster = None
         self._rook_env = RookEnv()
+        self.k8s_resource = dict()
 
         self._shutdown = threading.Event()
 
@@ -241,6 +397,18 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         self._initialized.wait()
         return self._rook_cluster
 
+    def get_k8s_resource(self, resource_type=None, mode='readable'):
+        """ Return specific k8s resource data """
+
+        if resource_type in self.k8s_resource:
+            if mode == 'readable':
+                return self.k8s_resource[resource_type].data
+            else:
+                return self.k8s_resource[resource_type].raw_data
+        else:
+            self.log.warning("[WRN] request ignored for non-existent k8s resource - {}".format(resource_type))
+            return None
+
     def serve(self):
         # For deployed clusters, we should always be running inside
         # a Rook cluster.  For development convenience, also support
@@ -261,6 +429,15 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         self._k8s = client.CoreV1Api()
 
+        self.k8s_object['storageclass'] = StorageClass(log=self.log)
+        if self.k8s_object['storageclass'].valid:
+            self.log.info("Fetching available storageclass definitions")
+            self.k8s_object['storageclass'].watch()
+        else:
+            self.log.warning("[WRN] Unable to use k8s API - "
+                           "{}/{}".format(self.k8s_object['storageclass'].api_name, 
+                                          self.k8s_object['storageclass'].method_name))
+
         try:
             # XXX mystery hack -- I need to do an API call from
             # this context, or subsequent API usage from handle_command