From: Paul Cuzner Date: Tue, 18 Jun 2019 03:58:17 +0000 (+1200) Subject: mgr/rook: Add feature to gather k8s storageclass information X-Git-Tag: v15.1.0~1998^2~4 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=00cfc5c4028ad452920ae7549641974424882fe1;p=ceph-ci.git mgr/rook: Add feature to gather k8s storageclass information Provide ability to fetch or watch k8s resources, and hold the resulting objects in a dict. A method has been added to the rook module to allow other mgr modules to fetch the state of specific k8s resources. Signed-off-by: Paul Cuzner --- diff --git a/src/pybind/mgr/rook/module.py b/src/pybind/mgr/rook/module.py index 51c9bd1e1d1..6cb90666377 100644 --- a/src/pybind/mgr/rook/module.py +++ b/src/pybind/mgr/rook/module.py @@ -9,7 +9,7 @@ except ImportError: pass # just for type checking try: - from kubernetes import client, config + from kubernetes import client, config, watch from kubernetes.client.rest import ApiException kubernetes_imported = True @@ -138,6 +138,161 @@ def deferred_read(f): return wrapper +def threaded(f): + def wrapper(*args, **kwargs): + t = threading.Thread(target=f, args=args, kwargs=kwargs) + t.daemon = True + t.start() + return t + return wrapper + + +class KubernetesResource(object): + """ Generic kubernetes Resource parent class + + The api fetch and watch methods should be common across resource types, + so this class implements the basics, and child classes should implement + anything specific to the resource type + """ + + def __init__(self, api_name, method_name, log): + self.api_name = api_name + self.api = None + self.method_name = method_name + self.health = 'OK' # OK or error reason + self.raw_data = None + self.log = log + + @property + def valid(self): + """ Check that the api, and method are viable """ + + if not hasattr(client, self.api_name): + self.health = "[ERR] : API is invalid" + return False + try: + _api = getattr(client, self.api_name)() + except ApiException: + self.health = "[ERR] : API is inaccessible" + return False + else: + self.api = _api + if not hasattr(_api, self.method_name): + self.health = "[ERR] : {} is an anvalid method of {}".format(self.method_name, self.api_name) + return False + return True + + def fetch(self): + """ Execute the requested api method as a one-off fetch""" + if self.valid: + try: + response = getattr(self.api, self.method_name)() + except ApiException: + self.raw_data = None + self.health = "[ERR] : k8s API call failed against {}.{}".format(self.api_name, + self.method_name) + self.log.error(self.health) + else: + self.last = time.time() + self.raw_data = response + if hasattr(response, 'items'): + self.items.clear() + for item in response.items: + name = item.metadata.name + self.items[name] = item + self.health = "[ERR] : API {}.{} is invalid/inaccessible".format(self.api_name, + self.method_name) + self.log.error(self.health) + + + @property + def data(self): + """ Process raw_data into a consumable dict - Override in the child """ + return self.raw_data + + @property + def resource_version(self): + # metadata is a V1ListMeta object type + if hasattr(self.raw_data, 'metadata'): + return self.raw_data.metadata.resource_version + else: + return None + + @threaded + def watch(self): + """ Start a thread which will use the kubernetes watch client against a resource """ + self.fetch() + if self.api_ok: + self.log.info("[INF] : Attaching resource watcher for k8s " + "{}.{}".format(self.api_name, self.method_name)) + self.watcher = self._watch() + + @threaded + def _watch(self): + """ worker thread that runs the kubernetes watch """ + + res_ver = self.resource_version + self.log.info("[INF] Attaching resource watcher for k8s " + "{}/{}".format(self.api_name, self.method_name)) + w = watch.Watch() + func = getattr(self.api, self.method_name) + + try: + # execute generator to continually watch resource for changes + for item in w.stream(func, resource_version=res_ver, watch=True): + obj = item['object'] + try: + name = obj.metadata.name + except AttributeError: + name = None + self.log.warning("[WRN] {}.{} doesn't contain a metadata.name. " + "Unable to track changes".format(self.api_name, + self.method_name)) + + if item['type'] == 'ADDED': + if self.filter(obj): + if self.items and name: + self.items[name] = obj + + elif item['type'] == 'DELETED': + if self.filter(obj): + if self.items and name: + del self.items[name] + + except AttributeError as e: + self.health = "[ERR] : Unable to attach watcher - incompatible urllib3? ({})".format(e) + self.log.error(self.health) + + + +class StorageClass(KubernetesResource): + + def __init__(self, api_name='StorageV1Api', method_name='list_storage_class', log=None): + KubernetesResource.__init__(self, api_name, method_name, log) + + @property + def data(self): + """ provide a more readable/consumable version of the list_storage_class raw data """ + + pool_lookup = dict() + + # raw_data contains an items list of V1StorageClass objects. Each object contains metadata + # attribute which is a V1ObjectMeta object + for item in self.raw_data.items: + if not item.provisioner.startswith(('ceph.rook', 'cephfs.csi.ceph', "rbd.csi.ceph")): + continue + + sc_name = item.metadata.name + # pool used by ceph csi, blockPool by legacy storageclass definition + pool_name = item.parameters.get('pool', item.parameters.get('blockPool')) + if pool_name in pool_lookup: + pool_lookup[pool_name].append(sc_name) + else: + pool_lookup[pool_name] = list([sc_name]) + + return pool_lookup + + class RookEnv(object): def __init__(self): # POD_NAMESPACE already exist for Rook 0.9 @@ -224,6 +379,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): self._k8s = None self._rook_cluster = None self._rook_env = RookEnv() + self.k8s_resource = dict() self._shutdown = threading.Event() @@ -241,6 +397,18 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): self._initialized.wait() return self._rook_cluster + def get_k8s_resource(self, resource_type=None, mode='readable'): + """ Return specific k8s resource data """ + + if resource_type in self.k8s_resource: + if mode == 'readable': + return self.k8s_resource[resource_type].data + else: + return self.k8s_resource[resource_type].raw_data + else: + self.log.warning("[WRN] request ignored for non-existent k8s resource - {}".format(resource_type)) + return None + def serve(self): # For deployed clusters, we should always be running inside # a Rook cluster. For development convenience, also support @@ -261,6 +429,15 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): self._k8s = client.CoreV1Api() + self.k8s_object['storageclass'] = StorageClass(log=self.log) + if self.k8s_object['storageclass'].valid: + self.log.info("Fetching available storageclass definitions") + self.k8s_object['storageclass'].watch() + else: + self.log.warning("[WRN] Unable to use k8s API - " + "{}/{}".format(self.k8s_object['storageclass'].api_name, + self.k8s_object['storageclass'].method_name)) + try: # XXX mystery hack -- I need to do an API call from # this context, or subsequent API usage from handle_command