import functools
import os
import uuid
+
try:
- from typing import List
+ from typing import List, Dict
from ceph.deployment.drive_group import DriveGroupSpec
except ImportError:
pass # just for type checking
try:
- from kubernetes import client, config, watch
+ from kubernetes import client, config
from kubernetes.client.rest import ApiException
kubernetes_imported = True
self.message = message
- self.error = None
+ self.exception = None
# XXX hacky global
global all_completions
def is_effective(self):
return self.effective
- @property
- def is_errored(self):
- return self.error is not None
-
def execute(self):
if not self.executed:
self._result = self.execute_cb()
return wrapper
-def threaded(f):
- def wrapper(*args, **kwargs):
- t = threading.Thread(target=f, args=args, kwargs=kwargs)
- t.daemon = True
- t.start()
- return t
- return wrapper
-
-
-class KubernetesResource(object):
- """ Generic kubernetes Resource parent class
-
- The api fetch and watch methods should be common across resource types,
- so this class implements the basics, and child classes should implement
- anything specific to the resource type
- """
-
- def __init__(self, api_name, method_name, log):
- self.api_name = api_name
- self.api = None
- self.method_name = method_name
- self.health = 'OK' # OK or error reason
- self.raw_data = None
- self.log = log
-
- @property
- def valid(self):
- """ Check that the api, and method are viable """
-
- if not hasattr(client, self.api_name):
- self.health = "[ERR] : API is invalid"
- return False
- try:
- _api = getattr(client, self.api_name)()
- except ApiException:
- self.health = "[ERR] : API is inaccessible"
- return False
- else:
- self.api = _api
- if not hasattr(_api, self.method_name):
- self.health = "[ERR] : {} is an anvalid method of {}".format(self.method_name, self.api_name)
- return False
- return True
-
- def fetch(self):
- """ Execute the requested api method as a one-off fetch"""
- if self.valid:
- try:
- response = getattr(self.api, self.method_name)()
- except ApiException:
- self.raw_data = None
- self.health = "[ERR] : k8s API call failed against {}.{}".format(self.api_name,
- self.method_name)
- self.log.error(self.health)
- else:
- self.last = time.time()
- self.raw_data = response
- if hasattr(response, 'items'):
- self.items.clear()
- for item in response.items:
- name = item.metadata.name
- self.items[name] = item
- self.health = "[ERR] : API {}.{} is invalid/inaccessible".format(self.api_name,
- self.method_name)
- self.log.error(self.health)
-
-
- @property
- def data(self):
- """ Process raw_data into a consumable dict - Override in the child """
- return self.raw_data
-
- @property
- def resource_version(self):
- # metadata is a V1ListMeta object type
- if hasattr(self.raw_data, 'metadata'):
- return self.raw_data.metadata.resource_version
- else:
- return None
-
- @threaded
- def watch(self):
- """ Start a thread which will use the kubernetes watch client against a resource """
- self.fetch()
- if self.api_ok:
- self.log.info("[INF] : Attaching resource watcher for k8s "
- "{}.{}".format(self.api_name, self.method_name))
- self.watcher = self._watch()
-
- @threaded
- def _watch(self):
- """ worker thread that runs the kubernetes watch """
-
- res_ver = self.resource_version
- self.log.info("[INF] Attaching resource watcher for k8s "
- "{}/{}".format(self.api_name, self.method_name))
- w = watch.Watch()
- func = getattr(self.api, self.method_name)
-
- try:
- # execute generator to continually watch resource for changes
- for item in w.stream(func, resource_version=res_ver, watch=True):
- obj = item['object']
- try:
- name = obj.metadata.name
- except AttributeError:
- name = None
- self.log.warning("[WRN] {}.{} doesn't contain a metadata.name. "
- "Unable to track changes".format(self.api_name,
- self.method_name))
-
- if item['type'] == 'ADDED':
- if self.filter(obj):
- if self.items and name:
- self.items[name] = obj
-
- elif item['type'] == 'DELETED':
- if self.filter(obj):
- if self.items and name:
- del self.items[name]
-
- except AttributeError as e:
- self.health = "[ERR] : Unable to attach watcher - incompatible urllib3? ({})".format(e)
- self.log.error(self.health)
-
-
-
-class StorageClass(KubernetesResource):
-
- def __init__(self, api_name='StorageV1Api', method_name='list_storage_class', log=None):
- KubernetesResource.__init__(self, api_name, method_name, log)
-
- @property
- def data(self):
- """ provide a more readable/consumable version of the list_storage_class raw data """
-
- pool_lookup = dict()
-
- # raw_data contains an items list of V1StorageClass objects. Each object contains metadata
- # attribute which is a V1ObjectMeta object
- for item in self.raw_data.items:
- if not item.provisioner.startswith(('ceph.rook', 'cephfs.csi.ceph', "rbd.csi.ceph")):
- continue
-
- sc_name = item.metadata.name
- # pool used by ceph csi, blockPool by legacy storageclass definition
- pool_name = item.parameters.get('pool', item.parameters.get('blockPool'))
- if pool_name in pool_lookup:
- pool_lookup[pool_name].append(sc_name)
- else:
- pool_lookup[pool_name] = list([sc_name])
-
- return pool_lookup
-
-
class RookEnv(object):
def __init__(self):
# POD_NAMESPACE already exist for Rook 0.9
# ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0
self.cluster_name = os.environ.get('ROOK_CEPH_CLUSTER_CRD_NAME', self.namespace)
- self.operator_namespace = os.environ.get('ROOK_OPERATOR_NAMESPACE', "rook-ceph-system")
+ self.operator_namespace = os.environ.get('ROOK_OPERATOR_NAMESPACE', self.namespace)
self.crd_version = os.environ.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
self.api_name = "ceph.rook.io/" + self.crd_version
self._k8s = None
self._rook_cluster = None
self._rook_env = RookEnv()
- self.k8s_resource = dict()
self._shutdown = threading.Event()
self._initialized.wait()
return self._rook_cluster
- def get_k8s_resource(self, resource_type=None, mode='readable'):
- """ Return specific k8s resource data """
-
- if resource_type in self.k8s_resource:
- if mode == 'readable':
- return self.k8s_resource[resource_type].data
- else:
- return self.k8s_resource[resource_type].raw_data
- else:
- self.log.warning("[WRN] request ignored for non-existent k8s resource - {}".format(resource_type))
- return None
-
def serve(self):
# For deployed clusters, we should always be running inside
# a Rook cluster. For development convenience, also support
# running outside (reading ~/.kube config)
- if self._rook_env.cluster_name:
+ if self._rook_env.has_namespace():
config.load_incluster_config()
cluster_name = self._rook_env.cluster_name
else:
self._k8s = client.CoreV1Api()
- self.k8s_object['storageclass'] = StorageClass(log=self.log)
- if self.k8s_object['storageclass'].valid:
- self.log.info("Fetching available storageclass definitions")
- self.k8s_object['storageclass'].watch()
- else:
- self.log.warning("[WRN] Unable to use k8s API - "
- "{}/{}".format(self.k8s_object['storageclass'].api_name,
- self.k8s_object['storageclass'].method_name))
-
try:
# XXX mystery hack -- I need to do an API call from
# this context, or subsequent API usage from handle_command
return result
+ @deferred_read
+ def get_hosts(self):
+ return [orchestrator.InventoryNode(n, []) for n in self.rook_cluster.get_node_names()]
+
@deferred_read
def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False):
This module is runnable outside of ceph-mgr, useful for testing.
"""
+import threading
import logging
import json
from contextlib import contextmanager
# Optional kubernetes imports to enable MgrModule.can_run
# to behave cleanly.
+from mgr_util import merge_dicts
+
try:
from kubernetes.client.rest import ApiException
+ from kubernetes.client import V1ListMeta, CoreV1Api, V1Pod
+ from kubernetes import watch
except ImportError:
- ApiException = None
+ class ApiException(Exception): pass
+
+
+import orchestrator
+
try:
- import orchestrator
from rook.module import RookEnv
- from typing import List
+ from typing import List, Dict
except ImportError:
pass # just used for type checking.
-
log = logging.getLogger(__name__)
-class ApplyException(Exception):
+class ApplyException(orchestrator.OrchestratorError):
"""
For failures to update the Rook CRDs, usually indicating
some kind of interference between our attempted update
"""
+def threaded(f):
+ def wrapper(*args, **kwargs):
+ t = threading.Thread(target=f, args=args, kwargs=kwargs)
+ t.start()
+ return t
+
+ return wrapper
+
+
+class KubernetesResource(object):
+ def __init__(self, api_func, **kwargs):
+ """
+ Generic kubernetes Resource parent class
+
+ The api fetch and watch methods should be common across resource types,
+
+ Exceptions in the runner thread are propagated to the caller.
+
+ :param api_func: kubernetes client api function that is passed to the watcher
+ :param filter_func: signature: ``(Item) -> bool``.
+ """
+ self.kwargs = kwargs
+ self.api_func = api_func
+
+ # ``_items`` is accessed by different threads. I assume assignment is atomic.
+ self._items = dict()
+ self.thread = None # type: threading.Thread
+ self.exception = None
+
+ def _fetch(self):
+ """ Execute the requested api method as a one-off fetch"""
+ response = self.api_func(**self.kwargs)
+ # metadata is a V1ListMeta object type
+ metadata = response.metadata # type: V1ListMeta
+ self._items = {item.metadata.name: item for item in response.items}
+ log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
+ return metadata.resource_version
+
+ @property
+ def items(self):
+ """
+ Returns the items of the request.
+ Creates the watcher as a side effect.
+ :return:
+ """
+ if self.exception:
+ e = self.exception
+ self.exception = None
+ raise e # Propagate the exception to the user.
+ if not self.thread or not self.thread.is_alive():
+ # Start a thread which will use the kubernetes watch client against a resource
+ resource_version = self._fetch()
+ log.debug("Attaching resource watcher for k8s {}".format(self.api_func))
+ self.thread = self._watch(resource_version) # type: threading.Thread
+
+ return self._items.values()
+
+ @threaded
+ def _watch(self, res_ver):
+ """ worker thread that runs the kubernetes watch """
+
+ self.exception = None
+
+ w = watch.Watch()
+
+ try:
+ # execute generator to continually watch resource for changes
+ for event in w.stream(self.api_func, resource_version=res_ver, watch=True,
+ **self.kwargs):
+ self.health = ''
+ item = event['object']
+ try:
+ name = item.metadata.name
+ except AttributeError:
+ raise AttributeError(
+ "{} doesn't contain a metadata.name. Unable to track changes".format(
+ self.api_func))
+
+ log.info('{} event: {}'.format(event['type'], name))
+
+ if event['type'] in ('ADDED', 'MODIFIED'):
+ self._items = merge_dicts(self._items, {name: item})
+ elif event['type'] == 'DELETED':
+ self._items = {k:v for k,v in self._items.items() if k != name}
+ elif event['type'] == 'BOOKMARK':
+ pass
+ elif event['type'] == 'ERROR':
+ raise ApiException(str(event))
+ else:
+ raise KeyError('Unknown watch event {}'.format(event['type']))
+
+ except ApiException as e:
+ log.exception('K8s API failed. {}'.format(self.api_func))
+ self.exception = e
+ raise
+ except AttributeError as e:
+ log.exception(
+ "Unable to attach watcher - incompatible urllib3? ({})".format(self.api_func))
+ self.exception = e
+ raise
+ except Exception as e:
+ log.exception("Watcher failed. ({})".format(self.api_func))
+ self.exception = e
+ raise
+
+
class RookCluster(object):
def __init__(self, k8s, rook_env):
self.rook_env = rook_env # type: RookEnv
- self.k8s = k8s
+ self.k8s = k8s # type: CoreV1Api
+
+ # TODO: replace direct k8s calls with Rook API calls
+ # when they're implemented
+ self.inventory_maps = KubernetesResource(self.k8s.list_namespaced_config_map,
+ namespace=self.rook_env.operator_namespace,
+ label_selector="app=rook-discover")
+
+ self.rook_pods = KubernetesResource(self.k8s.list_namespaced_pod,
+ namespace=self.rook_env.namespace,
+ label_selector="rook_cluster={0}".format(
+ self.rook_env.cluster_name))
+ self.nodes = KubernetesResource(self.k8s.list_node)
def rook_url(self, path):
prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
return self.rook_api_call("POST", path, **kwargs)
def get_discovered_devices(self, nodenames=None):
- # TODO: replace direct k8s calls with Rook API calls
- # when they're implemented
- label_selector = "app=rook-discover"
- if nodenames is not None:
- # FIXME: is there a practical or official limit on the
- # number of entries in a label selector
- label_selector += ", rook.io/node in ({0})".format(
- ", ".join(nodenames))
+ def predicate(item):
+ if nodenames is not None:
+ return item.metadata.labels['rook.io/node'] in nodenames
+ else:
+ return True
try:
- result = self.k8s.list_namespaced_config_map(
- self.rook_env.operator_namespace,
- label_selector=label_selector)
+ result = [i for i in self.inventory_maps.items if predicate(i)]
except ApiException as e:
- log.exception("Failed to fetch device metadata: {0}".format(e))
+ log.exception("Failed to fetch device metadata")
raise
nodename_to_devices = {}
- for i in result.items:
+ for i in result:
drives = json.loads(i.data['devices'])
nodename_to_devices[i.metadata.labels['rook.io/node']] = drives
url = "rados://{0}/{1}/conf-{2}.{3}".format(pool, namespace, nfs_cluster, instance)
return url
-
def describe_pods(self, service_type, service_id, nodename):
- # Go query the k8s API about deployment, containers related to this
- # filesystem
-
- # Inspect the Rook YAML, to decide whether this filesystem
- # is Ceph-managed or Rook-managed
- # TODO: extend Orchestrator interface to describe whether FS
- # is manageable by us or not
-
- # Example Rook Pod labels for a mgr daemon:
- # Labels: app=rook-ceph-mgr
- # pod-template-hash=2171958073
- # rook_cluster=rook
- # And MDS containers additionally have `rook_filesystem` label
-
- # Label filter is rook_cluster=<cluster name>
- # rook_file_system=<self.fs_name>
-
- label_filter = "rook_cluster={0}".format(self.rook_env.cluster_name)
- if service_type != None:
- label_filter += ",app=rook-ceph-{0}".format(service_type)
- if service_id != None:
- if service_type == "mds":
- label_filter += ",rook_file_system={0}".format(service_id)
- elif service_type == "osd":
- # Label added in https://github.com/rook/rook/pull/1698
- label_filter += ",ceph-osd-id={0}".format(service_id)
- elif service_type == "mon":
- # label like mon=rook-ceph-mon0
- label_filter += ",mon={0}".format(service_id)
- elif service_type == "mgr":
- label_filter += ",mgr={0}".format(service_id)
- elif service_type == "nfs":
- label_filter += ",ceph_nfs={0}".format(service_id)
- elif service_type == "rgw":
- # TODO: rgw
- pass
+ """
+ Go query the k8s API about deployment, containers related to this
+ filesystem
- field_filter = ""
- if nodename != None:
- field_filter = "spec.nodeName={0}".format(nodename)
+ Example Rook Pod labels for a mgr daemon:
+ Labels: app=rook-ceph-mgr
+ pod-template-hash=2171958073
+ rook_cluster=rook
+ And MDS containers additionally have `rook_filesystem` label
- pods = self.k8s.list_namespaced_pod(
- self.rook_env.namespace,
- label_selector=label_filter,
- field_selector=field_filter)
+ Label filter is rook_cluster=<cluster name>
+ rook_file_system=<self.fs_name>
+ """
+ def predicate(item):
+ # type: (V1Pod) -> bool
+ metadata = item.metadata
+ if service_type is not None:
+ if metadata.labels['app'] != "rook-ceph-{0}".format(service_type):
+ return False
+
+ if service_id is not None:
+ try:
+ k, v = {
+ "mds": ("rook_file_system", service_id),
+ "osd": ("ceph-osd-id", service_id),
+ "mon": ("mon", service_id),
+ "mgr": ("mgr", service_id),
+ "ceph_nfs": ("ceph_nfs", service_id),
+ "rgw": ("ceph_rgw", service_id),
+ }[service_type]
+ except KeyError:
+ raise orchestrator.OrchestratorValidationError(
+ '{} not supported'.format(service_type))
+ if metadata.labels[k] != v:
+ return False
+
+ if nodename is not None:
+ if item.spec.node_name != nodename:
+ return False
+ return True
- # import json
- # print json.dumps(pods.items[0])
+ pods = [i for i in self.rook_pods.items if predicate(i)]
pods_summary = []
- for p in pods.items:
+ for p in pods:
d = p.to_dict()
# p['metadata']['creationTimestamp']
- # p['metadata']['nodeName']
pods_summary.append({
"name": d['metadata']['name'],
"nodename": d['spec']['node_name'],
"labels": d['metadata']['labels']
})
- pass
return pods_summary
+ def get_node_names(self):
+ return [i.metadata.name for i in self.nodes.items]
+
@contextmanager
def ignore_409(self, what):
try:
return not use_all_nodes
def node_exists(self, node_name):
- try:
- self.k8s.read_node(node_name, exact=False, export=True)
- except ApiException as e:
- if e.status == 404:
- return False
- else:
- raise
- else:
- return True
+ return node_name in self.get_node_names()
def update_mon_count(self, newcount):
patch = [{"op": "replace", "path": "/spec/mon/count", "value": newcount}]