From: John Spray Date: Wed, 2 May 2018 10:58:10 +0000 (+0100) Subject: mgr/rook: Rook orchestrator module X-Git-Tag: v14.0.1~650^2~5 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=f3164d0fd2277479c6c3b203764566eda51b8e72;p=ceph-ci.git mgr/rook: Rook orchestrator module Signed-off-by: John Spray --- diff --git a/src/pybind/mgr/rook/__init__.py b/src/pybind/mgr/rook/__init__.py new file mode 100644 index 00000000000..286a1101277 --- /dev/null +++ b/src/pybind/mgr/rook/__init__.py @@ -0,0 +1,2 @@ + +from module import RookOrchestrator diff --git a/src/pybind/mgr/rook/module.py b/src/pybind/mgr/rook/module.py new file mode 100644 index 00000000000..0120e916af1 --- /dev/null +++ b/src/pybind/mgr/rook/module.py @@ -0,0 +1,392 @@ +import threading +import functools +import os +import uuid + +from mgr_module import MgrModule + +import orchestrator + +try: + from kubernetes import client, config + + kubernetes_imported = True +except ImportError: + kubernetes_imported = False + client = None + config = None + +from rook_cluster import RookCluster, ApplyException + + +all_completions = [] + + +class RookReadCompletion(orchestrator.ReadCompletion): + """ + All reads are simply API calls: avoid spawning + huge numbers of threads by just running them + inline when someone calls wait() + """ + + def __init__(self, cb): + super(RookReadCompletion, self).__init__() + self.cb = cb + self.result = None + self._complete = False + + self.message = "" + + # XXX hacky global + global all_completions + all_completions.append(self) + + @property + def is_complete(self): + return self._complete + + def execute(self): + self.result = self.cb() + self._complete = True + + +class RookWriteCompletion(orchestrator.WriteCompletion): + """ + Writes are a two-phase thing, firstly sending + the write to the k8s API (fast) and then waiting + for the corresponding change to appear in the + Ceph cluster (slow) + """ + # XXX kubernetes bindings call_api already usefully has + # a completion= param that uses threads. Maybe just + # use that? + def __init__(self, execute_cb, complete_cb, message): + super(RookWriteCompletion, self).__init__() + self.execute_cb = execute_cb + self.complete_cb = complete_cb + + # Executed means I executed my k8s API call, it may or may + # not have succeeded + self.executed = False + + # Result of k8s API call, this is set if executed==True + self.k8s_result = None + + self.effective = False + + self.id = str(uuid.uuid4()) + + self.message = message + + self.error = None + + # XXX hacky global + global all_completions + all_completions.append(self) + + @property + def is_persistent(self): + return (not self.is_errored) and self.executed + + @property + def is_effective(self): + return self.effective + + @property + def is_errored(self): + return self.error is not None + + def execute(self): + if not self.executed: + self.k8s_result = self.execute_cb() + self.executed = True + + if not self.effective: + # TODO: check self.result for API errors + if self.complete_cb is None: + self.effective = True + else: + self.effective = self.complete_cb() + + +def deferred_read(f): + """ + Decorator to make RookOrchestrator methods return + a completion object that executes themselves. + """ + + @functools.wraps(f) + def wrapper(*args, **kwargs): + return RookReadCompletion(lambda: f(*args, **kwargs)) + + return wrapper + + +class RookOrchestrator(MgrModule, orchestrator.Orchestrator): + OPTIONS = [ + # TODO: configure k8s API addr instead of assuming local + ] + + def _progress(self, *args, **kwargs): + try: + self.remote("progress", *args, **kwargs) + except ImportError: + # If the progress module is disabled that's fine, + # they just won't see the output. + pass + + def wait(self, completions): + self.log.info("wait: completions={0}".format(completions)) + + incomplete = False + + # Our `wait` implementation is very simple because everything's + # just an API call. + for c in completions: + if not isinstance(c, RookReadCompletion) and \ + not isinstance(c, RookWriteCompletion): + raise TypeError( + "wait() requires list of completions, not {0}".format( + c.__class__ + )) + + if c.is_complete: + continue + + if not c.is_read: + self._progress("update", c.id, c.message, 0.5) + + try: + c.execute() + except Exception as e: + self.log.exception("Completion {0} threw an exception:".format( + c.message + )) + c.error = e + if not c.is_read: + self._progress("complete", c.id) + else: + if c.is_complete: + if not c.is_read: + self._progress("complete", c.id) + + if not c.is_complete: + incomplete = True + + return not incomplete + + @staticmethod + def can_run(): + if kubernetes_imported: + return True, "" + else: + return False, "kubernetes module not found" + + def __init__(self, *args, **kwargs): + super(RookOrchestrator, self).__init__(*args, **kwargs) + + self._initialized = threading.Event() + self._k8s = None + self._rook_cluster = None + + self._shutdown = threading.Event() + + def shutdown(self): + self._shutdown.set() + + @property + def k8s(self): + self._initialized.wait() + return self._k8s + + @property + def rook_cluster(self): + self._initialized.wait() + return self._rook_cluster + + def serve(self): + # For deployed clusters, we should always be running inside + # a Rook cluster. For development convenience, also support + # running outside (reading ~/.kube config) + in_cluster = 'ROOK_CLUSTER_NAME' in os.environ + if in_cluster: + config.load_incluster_config() + cluster_name = os.environ['ROOK_CLUSTER_NAME'] + else: + self.log.warning("DEVELOPMENT ONLY: Reading kube config from ~") + config.load_kube_config() + + cluster_name = "rook" + + # So that I can do port forwarding from my workstation - jcsp + from kubernetes.client import configuration + configuration.verify_ssl = False + + self._k8s = client.CoreV1Api() + + # XXX mystery hack -- I need to do an API call from + # this context, or subsequent API usage from handle_command + # fails with SSLError('bad handshake'). Suspect some kind of + # thread context setup in SSL lib? + self._k8s.list_namespaced_pod(cluster_name) + + self._rook_cluster = RookCluster( + self._k8s, + cluster_name) + + # In case Rook isn't already clued in to this ceph + # cluster's existence, initialize it. + # self._rook_cluster.init_rook() + + self._initialized.set() + + while not self._shutdown.is_set(): + # XXX hack (or is it?) to kick all completions periodically, + # in case we had a caller that wait()'ed on them long enough + # to get persistence but not long enough to get completion + + global all_completions + self.wait(all_completions) + all_completions = filter(lambda x: not x.is_complete, + all_completions) + + self._shutdown.wait(5) + + # TODO: watch Rook for config changes to complain/update if + # things look a bit out of sync? + + @deferred_read + def get_inventory(self, node_filter=None): + node_list = None + if node_filter and node_filter.nodes: + # Explicit node list + node_list = node_filter.nodes + elif node_filter and node_filter.labels: + # TODO: query k8s API to resolve to node list, and pass + # it into RookCluster.get_discovered_devices + raise NotImplementedError() + + devs = self.rook_cluster.get_discovered_devices(node_list) + + result = [] + for node_name, node_devs in devs.items(): + devs = [] + for d in node_devs: + dev = orchestrator.InventoryDevice() + + # XXX CAUTION! https://github.com/rook/rook/issues/1716 + # Passing this through for the sake of completeness but it + # is not trustworthy! + dev.blank = d['empty'] + dev.type = 'hdd' if d['rotational'] else 'ssd' + dev.id = d['name'] + dev.size = d['size'] + + if d['filesystem'] == "" and not d['rotational']: + # Empty or partitioned SSD + partitioned_space = sum( + [p['size'] for p in d['Partitions']]) + dev.metadata_space_free = max(0, d[ + 'size'] - partitioned_space) + + devs.append(dev) + + result.append(orchestrator.InventoryNode(node_name, devs)) + + return result + + @deferred_read + def describe_service(self, service_type, service_id): + assert service_type in ("mds", "osd", "mon", "rgw") + + pods = self.rook_cluster.describe_pods(service_type, service_id) + + result = orchestrator.ServiceDescription() + for p in pods: + sl = orchestrator.ServiceLocation() + sl.nodename = p['nodename'] + sl.container_id = p['name'] + + if service_type == "osd": + sl.daemon_name = "%s" % p['labels']["ceph-osd-id"] + elif service_type == "mds": + # MDS daemon names are the tail of the pod name with + # an 'm' prefix. + # TODO: Would be nice to get this out a label though. + sl.daemon_name = "m" + sl.container_id.split("-")[-1] + elif service_type == "mon": + sl.daemon_name = p['labels']["mon"] + elif service_type == "mgr": + # FIXME: put a label on the pod to consume + # from here + raise NotImplementedError("mgr") + elif service_type == "rgw": + # FIXME: put a label on the pod to consume + # from here + raise NotImplementedError("rgw") + + result.locations.append(sl) + + return result + + def add_stateless_service(self, service_type, spec): + # assert isinstance(spec, orchestrator.StatelessServiceSpec) + + if service_type == "mds": + return RookWriteCompletion( + lambda: self.rook_cluster.add_filesystem(spec), None, + "Creating Filesystem services for {0}".format(spec.name)) + else: + # TODO: RGW, NFS + raise NotImplementedError(service_type) + + def create_osds(self, spec): + # Validate spec.node + if not self.rook_cluster.node_exists(spec.node): + raise RuntimeError("Node '{0}' is not in the Kubernetes " + "cluster".format(spec.node)) + + # Validate whether cluster CRD can accept individual OSD + # creations (i.e. not useAllDevices) + if not self.rook_cluster.can_create_osd(): + raise RuntimeError("Rook cluster configuration does not " + "support OSD creation.") + + def execute(): + self.rook_cluster.add_osds(spec) + + def is_complete(): + # Find OSD pods on this host + pod_osd_ids = set() + pods = self._k8s.list_namespaced_pod("rook-ceph", + label_selector="rook_cluster=rook-ceph,app=rook-ceph-osd", + field_selector="spec.nodeName={0}".format( + spec.node + )).items + for p in pods: + pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id'])) + + self.log.debug('pod_osd_ids={0}'.format(pod_osd_ids)) + + found = [] + osdmap = self.get("osd_map") + for osd in osdmap['osds']: + osd_id = osd['osd'] + if osd_id not in pod_osd_ids: + continue + + metadata = self.get_metadata('osd', "%s" % osd_id) + if metadata and metadata['devices'] in spec.drive_group.devices: + found.append(osd_id) + else: + self.log.info("ignoring osd {0} {1}".format( + osd_id, metadata['devices'] + )) + + return found is not None + + return RookWriteCompletion(execute, is_complete, + "Creating OSD on {0}:{1}".format( + spec.node, + spec.drive_group.devices + )) diff --git a/src/pybind/mgr/rook/rook_cluster.py b/src/pybind/mgr/rook/rook_cluster.py new file mode 100644 index 00000000000..c4046fe5791 --- /dev/null +++ b/src/pybind/mgr/rook/rook_cluster.py @@ -0,0 +1,320 @@ +""" +This module wrap's Rook + Kubernetes APIs to expose the calls +needed to implement an orchestrator module. While the orchestrator +module exposes an async API, this module simply exposes blocking API +call methods. + +This module is runnable outside of ceph-mgr, useful for testing. +""" + +import urlparse +import logging +import json + +# Optional kubernetes imports to enable MgrModule.can_run +# to behave cleanly. +try: + from kubernetes.client.rest import ApiException +except ImportError: + ApiException = None + +ROOK_SYSTEM_NS = "rook-ceph-system" +ROOK_API_VERSION = "v1alpha1" +ROOK_API_NAME = "ceph.rook.io/%s" % ROOK_API_VERSION + +log = logging.getLogger('rook') + + +class ApplyException(Exception): + """ + For failures to update the Rook CRDs, usually indicating + some kind of interference between our attempted update + and other conflicting activity. + """ + + +class RookCluster(object): + def __init__(self, k8s, cluster_name): + self.cluster_name = cluster_name + self.k8s = k8s + + @property + def rook_namespace(self): + # For the moment, assume Rook NS always equal to cluster name + # (this is also assumed some places in Rook source, may + # be formalized at some point) + return self.cluster_name + + def init_rook(self): + """ + Create a passive Rook configuration for this Ceph cluster. This + will prompt Rook to start watching for other resources within + the cluster (e.g. Filesystem CRDs), but no other action will happen. + """ + + # TODO: complete or remove this functionality: if Rook wasn't + # already running, then we would need to supply it with + # keys and ceph.conf as well as creating the cluster CRD + + cluster_crd = { + "apiVersion": ROOK_API_NAME, + "kind": "Cluster", + "metadata": { + "name": self.cluster_name, + "namespace": self.cluster_name + }, + "spec": { + "backend": "ceph", + "hostNetwork": True + } + } + + self.rook_api_post("clusters", body=cluster_crd) + + def rook_url(self, path): + prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % ( + ROOK_API_VERSION, self.rook_namespace) + return urlparse.urljoin(prefix, path) + + def rook_api_call(self, verb, path, **kwargs): + full_path = self.rook_url(path) + log.debug("[%s] %s" % (verb, full_path)) + + return self.k8s.api_client.call_api( + full_path, + verb, + auth_settings=['BearerToken'], + response_type="object", + _return_http_data_only=True, + _preload_content=True, + **kwargs) + + def rook_api_get(self, path, **kwargs): + return self.rook_api_call("GET", path, **kwargs) + + def rook_api_patch(self, path, **kwargs): + return self.rook_api_call("PATCH", path, + header_params={"Content-Type": "application/json-patch+json"}, + **kwargs) + + def rook_api_post(self, path, **kwargs): + return self.rook_api_call("POST", path, **kwargs) + + def get_discovered_devices(self, nodenames=None): + # TODO: replace direct k8s calls with Rook API calls + # when they're implemented + label_selector = "app=rook-discover" + if nodenames is not None: + # FIXME: is there a practical or official limit on the + # number of entries in a label selector + label_selector += ", rook.io/node in ({0})".format( + ", ".join(nodenames)) + + try: + result = self.k8s.list_namespaced_config_map( + ROOK_SYSTEM_NS, + label_selector=label_selector) + except ApiException as e: + log.warn("Failed to fetch device metadata: {0}".format(e)) + raise + + nodename_to_devices = {} + for i in result.items: + drives = json.loads(i.data['devices']) + nodename_to_devices[i.metadata.labels['rook.io/node']] = drives + + return nodename_to_devices + + def describe_pods(self, service_type, service_id): + # Go query the k8s API about deployment, containers related to this + # filesystem + + # Inspect the Rook YAML, to decide whether this filesystem + # is Ceph-managed or Rook-managed + # TODO: extend Orchestrator interface to describe whether FS + # is manageable by us or not + + # Example Rook Pod labels for a mgr daemon: + # Labels: app=rook-ceph-mgr + # pod-template-hash=2171958073 + # rook_cluster=rook + # And MDS containers additionally have `rook_filesystem` label + + # Label filter is rook_cluster= + # rook_file_system= + + label_filter = "rook_cluster={0},app=rook-ceph-{1}".format( + self.cluster_name, service_type) + if service_type == "mds": + label_filter += ",rook_file_system={0}".format(service_id) + elif service_type == "osd": + # Label added in https://github.com/rook/rook/pull/1698 + label_filter += ",ceph-osd-id={0}".format(service_id) + elif service_type == "mon": + # label like mon=rook-ceph-mon0 + label_filter += ",mon={0}".format(service_id) + elif service_type == "mgr": + # TODO: get Rook to label mgr pods + pass + elif service_type == "rgw": + # TODO: rgw + pass + + pods = self.k8s.list_namespaced_pod( + self.rook_namespace, + label_selector=label_filter) + + # import json + # print json.dumps(pods.items[0]) + + pods_summary = [] + + for p in pods.items: + d = p.to_dict() + # p['metadata']['creationTimestamp'] + # p['metadata']['nodeName'] + pods_summary.append({ + "name": d['metadata']['name'], + "nodename": d['spec']['node_name'], + "labels": d['metadata']['labels'] + }) + pass + + return pods_summary + + def add_filesystem(self, spec): + # TODO use spec.placement + # TODO use spec.min_size (and use max_size meaningfully) + # TODO warn if spec.extended has entries we don't kow how + # to action. + + rook_fs = { + "apiVersion": ROOK_API_NAME, + "kind": "Filesystem", + "metadata": { + "name": spec.name, + "namespace": self.rook_namespace + }, + "spec": { + "preservePoolsOnRemove": True, + "skipPoolCreation": True, + "metadataServer": { + "activeCount": spec.max_size, + "activeStandby": True + + } + } + } + + try: + self.rook_api_post( + "filesystems/", + body=rook_fs + ) + except ApiException as e: + if e.status == 409: + log.info("Filesystem '{0}' already exists".format(spec.name)) + # Idempotent, succeed. + else: + raise + + def can_create_osd(self): + current_cluster = self.rook_api_get( + "clusters/{0}".format(self.cluster_name)) + use_all_nodes = current_cluster['spec'].get('useAllNodes', False) + + # If useAllNodes is set, then Rook will not be paying attention + # to anything we put in 'nodes', so can't do OSD creation. + return not use_all_nodes + + def node_exists(self, node_name): + try: + self.k8s.read_node(node_name) + except ApiException as e: + if e.status == 404: + return False + else: + raise + else: + return True + + def add_osds(self, spec): + """ + Rook currently (0.8) can only do single-drive OSDs, so we + treat all drive groups as just a list of individual OSDs. + """ + # assert isinstance(spec, orchestrator.OsdSpec) + + block_devices = spec.drive_group.devices + + assert spec.format in ("bluestore", "filestore") + + # The CRD looks something like this: + # nodes: + # - name: "gravel1.rockery" + # devices: + # - name: "sdb" + # storeConfig: + # storeType: bluestore + + current_cluster = self.rook_api_get( + "clusters/{0}".format(self.cluster_name)) + + patch = [] + + # FIXME: this is all not really atomic, because jsonpatch doesn't + # let us do "test" operations that would check if items with + # matching names were in existing lists. + + if 'nodes' not in current_cluster['spec']['storage']: + patch.append({ + 'op': 'add', 'path': '/spec/storage/nodes', 'value': [] + }) + + current_nodes = current_cluster['spec']['storage'].get('nodes', []) + + if spec.node not in [n['name'] for n in current_nodes]: + patch.append({ + "op": "add", "path": "/spec/storage/nodes/-", "value": { + "name": spec.node, + "devices": [{'name': d} for d in block_devices], + "storeConfig": { + "storeType": spec.format + } + } + }) + else: + # Extend existing node + node_idx = None + current_node = None + for i, c in enumerate(current_nodes): + if c['name'] == spec.node: + current_node = c + node_idx = i + break + + assert node_idx is not None + assert current_node is not None + + new_devices = list(set(block_devices) - set([d['name'] for d in current_node['devices']])) + + for n in new_devices: + patch.append({ + "op": "add", + "path": "/spec/storage/nodes/{0}/devices/-".format(node_idx), + "value": {'name': n} + }) + + if len(patch) == 0: + log.warning("No-op adding stateful service") + return + + try: + self.rook_api_patch( + "clusters/{0}".format(self.cluster_name), + body=patch) + except ApiException as e: + log.exception("API exception: {0}".format(e.message)) + raise ApplyException( + "Failed to create OSD entries in Cluster CRD: {0}".format( + e.message))