]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/rook: Rook orchestrator module
authorJohn Spray <john.spray@redhat.com>
Wed, 2 May 2018 10:58:10 +0000 (11:58 +0100)
committerJohn Spray <john.spray@redhat.com>
Mon, 6 Aug 2018 15:22:29 +0000 (16:22 +0100)
Signed-off-by: John Spray <john.spray@redhat.com>
src/pybind/mgr/rook/__init__.py [new file with mode: 0644]
src/pybind/mgr/rook/module.py [new file with mode: 0644]
src/pybind/mgr/rook/rook_cluster.py [new file with mode: 0644]

diff --git a/src/pybind/mgr/rook/__init__.py b/src/pybind/mgr/rook/__init__.py
new file mode 100644 (file)
index 0000000..286a110
--- /dev/null
@@ -0,0 +1,2 @@
+
+from module import RookOrchestrator
diff --git a/src/pybind/mgr/rook/module.py b/src/pybind/mgr/rook/module.py
new file mode 100644 (file)
index 0000000..0120e91
--- /dev/null
@@ -0,0 +1,392 @@
+import threading
+import functools
+import os
+import uuid
+
+from mgr_module import MgrModule
+
+import orchestrator
+
+try:
+    from kubernetes import client, config
+
+    kubernetes_imported = True
+except ImportError:
+    kubernetes_imported = False
+    client = None
+    config = None
+
+from rook_cluster import RookCluster, ApplyException
+
+
+all_completions = []
+
+
+class RookReadCompletion(orchestrator.ReadCompletion):
+    """
+    All reads are simply API calls: avoid spawning
+    huge numbers of threads by just running them
+    inline when someone calls wait()
+    """
+
+    def __init__(self, cb):
+        super(RookReadCompletion, self).__init__()
+        self.cb = cb
+        self.result = None
+        self._complete = False
+
+        self.message = "<read op>"
+
+        # XXX hacky global
+        global all_completions
+        all_completions.append(self)
+
+    @property
+    def is_complete(self):
+        return self._complete
+
+    def execute(self):
+        self.result = self.cb()
+        self._complete = True
+
+
+class RookWriteCompletion(orchestrator.WriteCompletion):
+    """
+    Writes are a two-phase thing, firstly sending
+    the write to the k8s API (fast) and then waiting
+    for the corresponding change to appear in the
+    Ceph cluster (slow)
+    """
+    # XXX kubernetes bindings call_api already usefully has
+    # a completion= param that uses threads.  Maybe just
+    # use that?
+    def __init__(self, execute_cb, complete_cb, message):
+        super(RookWriteCompletion, self).__init__()
+        self.execute_cb = execute_cb
+        self.complete_cb = complete_cb
+
+        # Executed means I executed my k8s API call, it may or may
+        # not have succeeded
+        self.executed = False
+
+        # Result of k8s API call, this is set if executed==True
+        self.k8s_result = None
+
+        self.effective = False
+
+        self.id = str(uuid.uuid4())
+
+        self.message = message
+
+        self.error = None
+
+        # XXX hacky global
+        global all_completions
+        all_completions.append(self)
+
+    @property
+    def is_persistent(self):
+        return (not self.is_errored) and self.executed
+
+    @property
+    def is_effective(self):
+        return self.effective
+
+    @property
+    def is_errored(self):
+        return self.error is not None
+
+    def execute(self):
+        if not self.executed:
+            self.k8s_result = self.execute_cb()
+            self.executed = True
+
+        if not self.effective:
+            # TODO: check self.result for API errors
+            if self.complete_cb is None:
+                self.effective = True
+            else:
+                self.effective = self.complete_cb()
+
+
+def deferred_read(f):
+    """
+    Decorator to make RookOrchestrator methods return
+    a completion object that executes themselves.
+    """
+
+    @functools.wraps(f)
+    def wrapper(*args, **kwargs):
+        return RookReadCompletion(lambda: f(*args, **kwargs))
+
+    return wrapper
+
+
+class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
+    OPTIONS = [
+        # TODO: configure k8s API addr instead of assuming local
+    ]
+
+    def _progress(self, *args, **kwargs):
+        try:
+            self.remote("progress", *args, **kwargs)
+        except ImportError:
+            # If the progress module is disabled that's fine,
+            # they just won't see the output.
+            pass
+
+    def wait(self, completions):
+        self.log.info("wait: completions={0}".format(completions))
+
+        incomplete = False
+
+        # Our `wait` implementation is very simple because everything's
+        # just an API call.
+        for c in completions:
+            if not isinstance(c, RookReadCompletion) and \
+                    not isinstance(c, RookWriteCompletion):
+                raise TypeError(
+                    "wait() requires list of completions, not {0}".format(
+                        c.__class__
+                    ))
+
+            if c.is_complete:
+                continue
+
+            if not c.is_read:
+                self._progress("update", c.id, c.message, 0.5)
+
+            try:
+                c.execute()
+            except Exception as e:
+                self.log.exception("Completion {0} threw an exception:".format(
+                    c.message
+                ))
+                c.error = e
+                if not c.is_read:
+                    self._progress("complete", c.id)
+            else:
+                if c.is_complete:
+                    if not c.is_read:
+                        self._progress("complete", c.id)
+
+            if not c.is_complete:
+                incomplete = True
+
+        return not incomplete
+
+    @staticmethod
+    def can_run():
+        if kubernetes_imported:
+            return True, ""
+        else:
+            return False, "kubernetes module not found"
+
+    def __init__(self, *args, **kwargs):
+        super(RookOrchestrator, self).__init__(*args, **kwargs)
+
+        self._initialized = threading.Event()
+        self._k8s = None
+        self._rook_cluster = None
+
+        self._shutdown = threading.Event()
+
+    def shutdown(self):
+        self._shutdown.set()
+
+    @property
+    def k8s(self):
+        self._initialized.wait()
+        return self._k8s
+
+    @property
+    def rook_cluster(self):
+        self._initialized.wait()
+        return self._rook_cluster
+
+    def serve(self):
+        # For deployed clusters, we should always be running inside
+        # a Rook cluster.  For development convenience, also support
+        # running outside (reading ~/.kube config)
+        in_cluster = 'ROOK_CLUSTER_NAME' in os.environ
+        if in_cluster:
+            config.load_incluster_config()
+            cluster_name = os.environ['ROOK_CLUSTER_NAME']
+        else:
+            self.log.warning("DEVELOPMENT ONLY: Reading kube config from ~")
+            config.load_kube_config()
+
+            cluster_name = "rook"
+
+            # So that I can do port forwarding from my workstation - jcsp
+            from kubernetes.client import configuration
+            configuration.verify_ssl = False
+
+        self._k8s = client.CoreV1Api()
+
+        # XXX mystery hack -- I need to do an API call from
+        # this context, or subsequent API usage from handle_command
+        # fails with SSLError('bad handshake').  Suspect some kind of
+        # thread context setup in SSL lib?
+        self._k8s.list_namespaced_pod(cluster_name)
+
+        self._rook_cluster = RookCluster(
+            self._k8s,
+            cluster_name)
+
+        # In case Rook isn't already clued in to this ceph
+        # cluster's existence, initialize it.
+        # self._rook_cluster.init_rook()
+
+        self._initialized.set()
+
+        while not self._shutdown.is_set():
+            # XXX hack (or is it?) to kick all completions periodically,
+            # in case we had a caller that wait()'ed on them long enough
+            # to get persistence but not long enough to get completion
+
+            global all_completions
+            self.wait(all_completions)
+            all_completions = filter(lambda x: not x.is_complete,
+                                     all_completions)
+
+            self._shutdown.wait(5)
+
+        # TODO: watch Rook for config changes to complain/update if
+        # things look a bit out of sync?
+
+    @deferred_read
+    def get_inventory(self, node_filter=None):
+        node_list = None
+        if node_filter and node_filter.nodes:
+            # Explicit node list
+            node_list = node_filter.nodes
+        elif node_filter and node_filter.labels:
+            # TODO: query k8s API to resolve to node list, and pass
+            # it into RookCluster.get_discovered_devices
+            raise NotImplementedError()
+
+        devs = self.rook_cluster.get_discovered_devices(node_list)
+
+        result = []
+        for node_name, node_devs in devs.items():
+            devs = []
+            for d in node_devs:
+                dev = orchestrator.InventoryDevice()
+
+                # XXX CAUTION!  https://github.com/rook/rook/issues/1716
+                # Passing this through for the sake of completeness but it
+                # is not trustworthy!
+                dev.blank = d['empty']
+                dev.type = 'hdd' if d['rotational'] else 'ssd'
+                dev.id = d['name']
+                dev.size = d['size']
+
+                if d['filesystem'] == "" and not d['rotational']:
+                    # Empty or partitioned SSD
+                    partitioned_space = sum(
+                        [p['size'] for p in d['Partitions']])
+                    dev.metadata_space_free = max(0, d[
+                        'size'] - partitioned_space)
+
+                devs.append(dev)
+
+            result.append(orchestrator.InventoryNode(node_name, devs))
+
+        return result
+
+    @deferred_read
+    def describe_service(self, service_type, service_id):
+        assert service_type in ("mds", "osd", "mon", "rgw")
+
+        pods = self.rook_cluster.describe_pods(service_type, service_id)
+
+        result = orchestrator.ServiceDescription()
+        for p in pods:
+            sl = orchestrator.ServiceLocation()
+            sl.nodename = p['nodename']
+            sl.container_id = p['name']
+
+            if service_type == "osd":
+                sl.daemon_name = "%s" % p['labels']["ceph-osd-id"]
+            elif service_type == "mds":
+                # MDS daemon names are the tail of the pod name with
+                # an 'm' prefix.
+                # TODO: Would be nice to get this out a label though.
+                sl.daemon_name = "m" + sl.container_id.split("-")[-1]
+            elif service_type == "mon":
+                sl.daemon_name = p['labels']["mon"]
+            elif service_type == "mgr":
+                # FIXME: put a label on the pod to consume
+                # from here
+                raise NotImplementedError("mgr")
+            elif service_type == "rgw":
+                # FIXME: put a label on the pod to consume
+                # from here
+                raise NotImplementedError("rgw")
+
+            result.locations.append(sl)
+
+        return result
+
+    def add_stateless_service(self, service_type, spec):
+        # assert isinstance(spec, orchestrator.StatelessServiceSpec)
+
+        if service_type == "mds":
+            return RookWriteCompletion(
+                lambda: self.rook_cluster.add_filesystem(spec), None,
+                "Creating Filesystem services for {0}".format(spec.name))
+        else:
+            # TODO: RGW, NFS
+            raise NotImplementedError(service_type)
+
+    def create_osds(self, spec):
+        # Validate spec.node
+        if not self.rook_cluster.node_exists(spec.node):
+            raise RuntimeError("Node '{0}' is not in the Kubernetes "
+                               "cluster".format(spec.node))
+
+        # Validate whether cluster CRD can accept individual OSD
+        # creations (i.e. not useAllDevices)
+        if not self.rook_cluster.can_create_osd():
+            raise RuntimeError("Rook cluster configuration does not "
+                               "support OSD creation.")
+
+        def execute():
+            self.rook_cluster.add_osds(spec)
+
+        def is_complete():
+            # Find OSD pods on this host
+            pod_osd_ids = set()
+            pods = self._k8s.list_namespaced_pod("rook-ceph",
+                                                 label_selector="rook_cluster=rook-ceph,app=rook-ceph-osd",
+                                                 field_selector="spec.nodeName={0}".format(
+                                                     spec.node
+                                                 )).items
+            for p in pods:
+                pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
+
+            self.log.debug('pod_osd_ids={0}'.format(pod_osd_ids))
+
+            found = []
+            osdmap = self.get("osd_map")
+            for osd in osdmap['osds']:
+                osd_id = osd['osd']
+                if osd_id not in pod_osd_ids:
+                    continue
+
+                metadata = self.get_metadata('osd', "%s" % osd_id)
+                if metadata and metadata['devices'] in spec.drive_group.devices:
+                    found.append(osd_id)
+                else:
+                    self.log.info("ignoring osd {0} {1}".format(
+                        osd_id, metadata['devices']
+                    ))
+
+            return found is not None
+
+        return RookWriteCompletion(execute, is_complete,
+                                   "Creating OSD on {0}:{1}".format(
+                                       spec.node,
+                                       spec.drive_group.devices
+                                   ))
diff --git a/src/pybind/mgr/rook/rook_cluster.py b/src/pybind/mgr/rook/rook_cluster.py
new file mode 100644 (file)
index 0000000..c4046fe
--- /dev/null
@@ -0,0 +1,320 @@
+"""
+This module wrap's Rook + Kubernetes APIs to expose the calls
+needed to implement an orchestrator module.  While the orchestrator
+module exposes an async API, this module simply exposes blocking API
+call methods.
+
+This module is runnable outside of ceph-mgr, useful for testing.
+"""
+
+import urlparse
+import logging
+import json
+
+# Optional kubernetes imports to enable MgrModule.can_run
+# to behave cleanly.
+try:
+    from kubernetes.client.rest import ApiException
+except ImportError:
+    ApiException = None
+
+ROOK_SYSTEM_NS = "rook-ceph-system"
+ROOK_API_VERSION = "v1alpha1"
+ROOK_API_NAME = "ceph.rook.io/%s" % ROOK_API_VERSION
+
+log = logging.getLogger('rook')
+
+
+class ApplyException(Exception):
+    """
+    For failures to update the Rook CRDs, usually indicating
+    some kind of interference between our attempted update
+    and other conflicting activity.
+    """
+
+
+class RookCluster(object):
+    def __init__(self, k8s, cluster_name):
+        self.cluster_name = cluster_name
+        self.k8s = k8s
+
+    @property
+    def rook_namespace(self):
+        # For the moment, assume Rook NS always equal to cluster name
+        # (this is also assumed some places in Rook source, may
+        #  be formalized at some point)
+        return self.cluster_name
+
+    def init_rook(self):
+        """
+        Create a passive Rook configuration for this Ceph cluster.  This
+        will prompt Rook to start watching for other resources within
+        the cluster (e.g. Filesystem CRDs), but no other action will happen.
+        """
+
+        # TODO: complete or remove this functionality: if Rook wasn't
+        # already running, then we would need to supply it with
+        # keys and ceph.conf as well as creating the cluster CRD
+
+        cluster_crd = {
+            "apiVersion": ROOK_API_NAME,
+            "kind": "Cluster",
+            "metadata": {
+                "name": self.cluster_name,
+                "namespace": self.cluster_name
+            },
+            "spec": {
+                "backend": "ceph",
+                "hostNetwork": True
+            }
+        }
+
+        self.rook_api_post("clusters", body=cluster_crd)
+
+    def rook_url(self, path):
+        prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
+            ROOK_API_VERSION, self.rook_namespace)
+        return urlparse.urljoin(prefix, path)
+
+    def rook_api_call(self, verb, path, **kwargs):
+        full_path = self.rook_url(path)
+        log.debug("[%s] %s" % (verb, full_path))
+
+        return self.k8s.api_client.call_api(
+            full_path,
+            verb,
+            auth_settings=['BearerToken'],
+            response_type="object",
+            _return_http_data_only=True,
+            _preload_content=True,
+            **kwargs)
+
+    def rook_api_get(self, path, **kwargs):
+        return self.rook_api_call("GET", path, **kwargs)
+
+    def rook_api_patch(self, path, **kwargs):
+        return self.rook_api_call("PATCH", path,
+                                  header_params={"Content-Type": "application/json-patch+json"},
+                                  **kwargs)
+
+    def rook_api_post(self, path, **kwargs):
+        return self.rook_api_call("POST", path, **kwargs)
+
+    def get_discovered_devices(self, nodenames=None):
+        # TODO: replace direct k8s calls with Rook API calls
+        # when they're implemented
+        label_selector = "app=rook-discover"
+        if nodenames is not None:
+            # FIXME: is there a practical or official limit on the
+            # number of entries in a label selector
+            label_selector += ", rook.io/node in ({0})".format(
+                ", ".join(nodenames))
+
+        try:
+            result = self.k8s.list_namespaced_config_map(
+                ROOK_SYSTEM_NS,
+                label_selector=label_selector)
+        except ApiException as e:
+            log.warn("Failed to fetch device metadata: {0}".format(e))
+            raise
+
+        nodename_to_devices = {}
+        for i in result.items:
+            drives = json.loads(i.data['devices'])
+            nodename_to_devices[i.metadata.labels['rook.io/node']] = drives
+
+        return nodename_to_devices
+
+    def describe_pods(self, service_type, service_id):
+        # Go query the k8s API about deployment, containers related to this
+        # filesystem
+
+        # Inspect the Rook YAML, to decide whether this filesystem
+        # is Ceph-managed or Rook-managed
+        # TODO: extend Orchestrator interface to describe whether FS
+        # is manageable by us or not
+
+        # Example Rook Pod labels for a mgr daemon:
+        # Labels:         app=rook-ceph-mgr
+        #                 pod-template-hash=2171958073
+        #                 rook_cluster=rook
+        # And MDS containers additionally have `rook_filesystem` label
+
+        # Label filter is rook_cluster=<cluster name>
+        #                 rook_file_system=<self.fs_name>
+
+        label_filter = "rook_cluster={0},app=rook-ceph-{1}".format(
+            self.cluster_name, service_type)
+        if service_type == "mds":
+            label_filter += ",rook_file_system={0}".format(service_id)
+        elif service_type == "osd":
+            # Label added in https://github.com/rook/rook/pull/1698
+            label_filter += ",ceph-osd-id={0}".format(service_id)
+        elif service_type == "mon":
+            # label like mon=rook-ceph-mon0
+            label_filter += ",mon={0}".format(service_id)
+        elif service_type == "mgr":
+            # TODO: get Rook to label mgr pods
+            pass
+        elif service_type == "rgw":
+            # TODO: rgw
+            pass
+
+        pods = self.k8s.list_namespaced_pod(
+            self.rook_namespace,
+            label_selector=label_filter)
+
+        # import json
+        # print json.dumps(pods.items[0])
+
+        pods_summary = []
+
+        for p in pods.items:
+            d = p.to_dict()
+            # p['metadata']['creationTimestamp']
+            # p['metadata']['nodeName']
+            pods_summary.append({
+                "name": d['metadata']['name'],
+                "nodename": d['spec']['node_name'],
+                "labels": d['metadata']['labels']
+            })
+            pass
+
+        return pods_summary
+
+    def add_filesystem(self, spec):
+        # TODO use spec.placement
+        # TODO use spec.min_size (and use max_size meaningfully)
+        # TODO warn if spec.extended has entries we don't kow how
+        #      to action.
+
+        rook_fs = {
+            "apiVersion": ROOK_API_NAME,
+            "kind": "Filesystem",
+            "metadata": {
+                "name": spec.name,
+                "namespace": self.rook_namespace
+            },
+            "spec": {
+                "preservePoolsOnRemove": True,
+                "skipPoolCreation": True,
+                "metadataServer": {
+                    "activeCount": spec.max_size,
+                    "activeStandby": True
+
+                }
+            }
+        }
+
+        try:
+            self.rook_api_post(
+                "filesystems/",
+                body=rook_fs
+            )
+        except ApiException as e:
+            if e.status == 409:
+                log.info("Filesystem '{0}' already exists".format(spec.name))
+                # Idempotent, succeed.
+            else:
+                raise
+
+    def can_create_osd(self):
+        current_cluster = self.rook_api_get(
+            "clusters/{0}".format(self.cluster_name))
+        use_all_nodes = current_cluster['spec'].get('useAllNodes', False)
+
+        # If useAllNodes is set, then Rook will not be paying attention
+        # to anything we put in 'nodes', so can't do OSD creation.
+        return not use_all_nodes
+
+    def node_exists(self, node_name):
+        try:
+            self.k8s.read_node(node_name)
+        except ApiException as e:
+            if e.status == 404:
+                return False
+            else:
+                raise
+        else:
+            return True
+
+    def add_osds(self, spec):
+        """
+        Rook currently (0.8) can only do single-drive OSDs, so we
+        treat all drive groups as just a list of individual OSDs.
+        """
+        # assert isinstance(spec, orchestrator.OsdSpec)
+
+        block_devices = spec.drive_group.devices
+
+        assert spec.format in ("bluestore", "filestore")
+
+        # The CRD looks something like this:
+        #     nodes:
+        #       - name: "gravel1.rockery"
+        #         devices:
+        #          - name: "sdb"
+        #         storeConfig:
+        #           storeType: bluestore
+
+        current_cluster = self.rook_api_get(
+            "clusters/{0}".format(self.cluster_name))
+
+        patch = []
+
+        # FIXME: this is all not really atomic, because jsonpatch doesn't
+        # let us do "test" operations that would check if items with
+        # matching names were in existing lists.
+
+        if 'nodes' not in current_cluster['spec']['storage']:
+            patch.append({
+                'op': 'add', 'path': '/spec/storage/nodes', 'value': []
+            })
+
+        current_nodes = current_cluster['spec']['storage'].get('nodes', [])
+
+        if spec.node not in [n['name'] for n in current_nodes]:
+            patch.append({
+                "op": "add", "path": "/spec/storage/nodes/-", "value": {
+                    "name": spec.node,
+                    "devices": [{'name': d} for d in block_devices],
+                    "storeConfig": {
+                        "storeType": spec.format
+                    }
+                }
+            })
+        else:
+            # Extend existing node
+            node_idx = None
+            current_node = None
+            for i, c in enumerate(current_nodes):
+                if c['name'] == spec.node:
+                    current_node = c
+                    node_idx = i
+                    break
+
+            assert node_idx is not None
+            assert current_node is not None
+
+            new_devices = list(set(block_devices) - set([d['name'] for d in current_node['devices']]))
+
+            for n in new_devices:
+                patch.append({
+                    "op": "add",
+                    "path": "/spec/storage/nodes/{0}/devices/-".format(node_idx),
+                    "value": {'name': n}
+                })
+
+        if len(patch) == 0:
+            log.warning("No-op adding stateful service")
+            return
+
+        try:
+            self.rook_api_patch(
+                "clusters/{0}".format(self.cluster_name),
+                body=patch)
+        except ApiException as e:
+            log.exception("API exception: {0}".format(e.message))
+            raise ApplyException(
+                "Failed to create OSD entries in Cluster CRD: {0}".format(
+                    e.message))