pass # just for type checking
try:
- from kubernetes import client, config
+ from kubernetes import client, config, watch
from kubernetes.client.rest import ApiException
kubernetes_imported = True
return wrapper
+def threaded(f):
+ def wrapper(*args, **kwargs):
+ t = threading.Thread(target=f, args=args, kwargs=kwargs)
+ t.daemon = True
+ t.start()
+ return t
+ return wrapper
+
+
+class KubernetesResource(object):
+ """ Generic kubernetes Resource parent class
+
+ The api fetch and watch methods should be common across resource types,
+ so this class implements the basics, and child classes should implement
+ anything specific to the resource type
+ """
+
+ def __init__(self, api_name, method_name, log):
+ self.api_name = api_name
+ self.api = None
+ self.method_name = method_name
+ self.health = 'OK' # OK or error reason
+ self.raw_data = None
+ self.log = log
+
+ @property
+ def valid(self):
+ """ Check that the api, and method are viable """
+
+ if not hasattr(client, self.api_name):
+ self.health = "[ERR] : API is invalid"
+ return False
+ try:
+ _api = getattr(client, self.api_name)()
+ except ApiException:
+ self.health = "[ERR] : API is inaccessible"
+ return False
+ else:
+ self.api = _api
+ if not hasattr(_api, self.method_name):
+ self.health = "[ERR] : {} is an anvalid method of {}".format(self.method_name, self.api_name)
+ return False
+ return True
+
+ def fetch(self):
+ """ Execute the requested api method as a one-off fetch"""
+ if self.valid:
+ try:
+ response = getattr(self.api, self.method_name)()
+ except ApiException:
+ self.raw_data = None
+ self.health = "[ERR] : k8s API call failed against {}.{}".format(self.api_name,
+ self.method_name)
+ self.log.error(self.health)
+ else:
+ self.last = time.time()
+ self.raw_data = response
+ if hasattr(response, 'items'):
+ self.items.clear()
+ for item in response.items:
+ name = item.metadata.name
+ self.items[name] = item
+ self.health = "[ERR] : API {}.{} is invalid/inaccessible".format(self.api_name,
+ self.method_name)
+ self.log.error(self.health)
+
+
+ @property
+ def data(self):
+ """ Process raw_data into a consumable dict - Override in the child """
+ return self.raw_data
+
+ @property
+ def resource_version(self):
+ # metadata is a V1ListMeta object type
+ if hasattr(self.raw_data, 'metadata'):
+ return self.raw_data.metadata.resource_version
+ else:
+ return None
+
+ @threaded
+ def watch(self):
+ """ Start a thread which will use the kubernetes watch client against a resource """
+ self.fetch()
+ if self.api_ok:
+ self.log.info("[INF] : Attaching resource watcher for k8s "
+ "{}.{}".format(self.api_name, self.method_name))
+ self.watcher = self._watch()
+
+ @threaded
+ def _watch(self):
+ """ worker thread that runs the kubernetes watch """
+
+ res_ver = self.resource_version
+ self.log.info("[INF] Attaching resource watcher for k8s "
+ "{}/{}".format(self.api_name, self.method_name))
+ w = watch.Watch()
+ func = getattr(self.api, self.method_name)
+
+ try:
+ # execute generator to continually watch resource for changes
+ for item in w.stream(func, resource_version=res_ver, watch=True):
+ obj = item['object']
+ try:
+ name = obj.metadata.name
+ except AttributeError:
+ name = None
+ self.log.warning("[WRN] {}.{} doesn't contain a metadata.name. "
+ "Unable to track changes".format(self.api_name,
+ self.method_name))
+
+ if item['type'] == 'ADDED':
+ if self.filter(obj):
+ if self.items and name:
+ self.items[name] = obj
+
+ elif item['type'] == 'DELETED':
+ if self.filter(obj):
+ if self.items and name:
+ del self.items[name]
+
+ except AttributeError as e:
+ self.health = "[ERR] : Unable to attach watcher - incompatible urllib3? ({})".format(e)
+ self.log.error(self.health)
+
+
+
+class StorageClass(KubernetesResource):
+
+ def __init__(self, api_name='StorageV1Api', method_name='list_storage_class', log=None):
+ KubernetesResource.__init__(self, api_name, method_name, log)
+
+ @property
+ def data(self):
+ """ provide a more readable/consumable version of the list_storage_class raw data """
+
+ pool_lookup = dict()
+
+ # raw_data contains an items list of V1StorageClass objects. Each object contains metadata
+ # attribute which is a V1ObjectMeta object
+ for item in self.raw_data.items:
+ if not item.provisioner.startswith(('ceph.rook', 'cephfs.csi.ceph', "rbd.csi.ceph")):
+ continue
+
+ sc_name = item.metadata.name
+ # pool used by ceph csi, blockPool by legacy storageclass definition
+ pool_name = item.parameters.get('pool', item.parameters.get('blockPool'))
+ if pool_name in pool_lookup:
+ pool_lookup[pool_name].append(sc_name)
+ else:
+ pool_lookup[pool_name] = list([sc_name])
+
+ return pool_lookup
+
+
class RookEnv(object):
def __init__(self):
# POD_NAMESPACE already exist for Rook 0.9
self._k8s = None
self._rook_cluster = None
self._rook_env = RookEnv()
+ self.k8s_resource = dict()
self._shutdown = threading.Event()
self._initialized.wait()
return self._rook_cluster
+ def get_k8s_resource(self, resource_type=None, mode='readable'):
+ """ Return specific k8s resource data """
+
+ if resource_type in self.k8s_resource:
+ if mode == 'readable':
+ return self.k8s_resource[resource_type].data
+ else:
+ return self.k8s_resource[resource_type].raw_data
+ else:
+ self.log.warning("[WRN] request ignored for non-existent k8s resource - {}".format(resource_type))
+ return None
+
def serve(self):
# For deployed clusters, we should always be running inside
# a Rook cluster. For development convenience, also support
self._k8s = client.CoreV1Api()
+ self.k8s_object['storageclass'] = StorageClass(log=self.log)
+ if self.k8s_object['storageclass'].valid:
+ self.log.info("Fetching available storageclass definitions")
+ self.k8s_object['storageclass'].watch()
+ else:
+ self.log.warning("[WRN] Unable to use k8s API - "
+ "{}/{}".format(self.k8s_object['storageclass'].api_name,
+ self.k8s_object['storageclass'].method_name))
+
try:
# XXX mystery hack -- I need to do an API call from
# this context, or subsequent API usage from handle_command