]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/orchestrator: Add test orchestrator
authorSebastian Wagner <sebastian.wagner@suse.com>
Fri, 23 Nov 2018 13:55:15 +0000 (14:55 +0100)
committerSebastian Wagner <sebastian.wagner@suse.com>
Thu, 20 Dec 2018 09:56:49 +0000 (10:56 +0100)
1. To be able to run the cli without an external orchestrator.
2. Run the CLI in Teuthology.

Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
qa/suites/rados/mgr/tasks/orchestrator_cli.yaml [new file with mode: 0644]
qa/tasks/mgr/test_orchestrator_cli.py [new file with mode: 0644]
src/pybind/mgr/orchestrator_cli/README.md [new file with mode: 0644]
src/pybind/mgr/test_orchestrator/__init__.py [new file with mode: 0644]
src/pybind/mgr/test_orchestrator/module.py [new file with mode: 0644]

diff --git a/qa/suites/rados/mgr/tasks/orchestrator_cli.yaml b/qa/suites/rados/mgr/tasks/orchestrator_cli.yaml
new file mode 100644 (file)
index 0000000..65b1d78
--- /dev/null
@@ -0,0 +1,18 @@
+
+tasks:
+  - install:
+  - ceph:
+      # tests may leave mgrs broken, so don't try and call into them
+      # to invoke e.g. pg dump during teardown.
+      wait-for-scrub: false
+      log-whitelist:
+        - overall HEALTH_
+        - \(MGR_DOWN\)
+        - \(MGR_INSIGHTS_WARNING\)
+        - \(insights_health_check
+        - \(PG_
+        - replacing it with standby
+        - No standby daemons available
+  - cephfs_test_runner:
+      modules:
+        - tasks.mgr.test_orchestrator_cli
\ No newline at end of file
diff --git a/qa/tasks/mgr/test_orchestrator_cli.py b/qa/tasks/mgr/test_orchestrator_cli.py
new file mode 100644 (file)
index 0000000..b6a4cc6
--- /dev/null
@@ -0,0 +1,33 @@
+import logging
+
+from mgr_test_case import MgrTestCase
+
+
+log = logging.getLogger(__name__)
+
+
+class TestOrchestratorCli(MgrTestCase):
+    MGRS_REQUIRED = 1
+
+    def _orch_cmd(self, *args):
+        retstr = self.mgr_cluster.mon_manager.raw_cluster_cmd("orchestrator", *args)
+        return retstr
+
+    def setUp(self):
+        super(TestOrchestratorCli, self).setUp()
+
+        self._load_module("orchestrator_cli")
+        self._load_module("test_orchestrator")
+        self._orch_cmd("set", "backend", "test_orchestrator")
+
+    def test_status(self):
+        ret = self._orch_cmd("status")
+        self.assertIn("test_orchestrator", ret)
+
+    def test_device_ls(self):
+        ret = self._orch_cmd("device", "ls")
+        self.assertIn("localhost:", ret)
+
+    def test_service_ls(self):
+        ret = self._orch_cmd("service", "ls")
+        self.assertIn("ceph-mgr", ret)
diff --git a/src/pybind/mgr/orchestrator_cli/README.md b/src/pybind/mgr/orchestrator_cli/README.md
new file mode 100644 (file)
index 0000000..97e8a10
--- /dev/null
@@ -0,0 +1,16 @@
+# Orchestrator CLI
+
+See also [../../../doc/mgr/orchestrator_cli.rst](../../../doc/mgr/orchestrator_cli.rst).
+
+## Running the Teuthology tests
+
+To run the API tests against a real Ceph cluster, we leverage the Teuthology
+framework and the `test_orchestrator` backend.
+
+``source`` the script and run the tests manually::
+
+    $ source ../dashboard/run-backend-api-tests.sh
+    $ run_teuthology_tests tasks.mgr.test_orchestrator_cli
+    $ cleanup_teuthology  
+
\ No newline at end of file
diff --git a/src/pybind/mgr/test_orchestrator/__init__.py b/src/pybind/mgr/test_orchestrator/__init__.py
new file mode 100644 (file)
index 0000000..2c4d309
--- /dev/null
@@ -0,0 +1 @@
+from .module import TestOrchestrator
diff --git a/src/pybind/mgr/test_orchestrator/module.py b/src/pybind/mgr/test_orchestrator/module.py
new file mode 100644 (file)
index 0000000..324aeb0
--- /dev/null
@@ -0,0 +1,251 @@
+import json
+import re
+import threading
+import functools
+import uuid
+from subprocess import check_output
+
+from mgr_module import MgrModule
+
+import orchestrator
+
+
+all_completions = []
+
+
+class TestReadCompletion(orchestrator.ReadCompletion):
+
+    def __init__(self, cb):
+        super(TestReadCompletion, self).__init__()
+        self.cb = cb
+        self._result = None
+        self._complete = False
+
+        self.message = "<read op>"
+
+        global all_completions
+        all_completions.append(self)
+
+    @property
+    def result(self):
+        return self._result
+
+    @property
+    def is_complete(self):
+        return self._complete
+
+    def execute(self):
+        self._result = self.cb()
+        self._complete = True
+
+
+class TestWriteCompletion(orchestrator.WriteCompletion):
+    def __init__(self, execute_cb, complete_cb, message):
+        super(TestWriteCompletion, self).__init__()
+        self.execute_cb = execute_cb
+        self.complete_cb = complete_cb
+
+        # Executed means I executed my API call, it may or may
+        # not have succeeded
+        self.executed = False
+
+        self._result = None
+
+        self.effective = False
+
+        self.id = str(uuid.uuid4())
+
+        self.message = message
+
+        self.error = None
+
+        # XXX hacky global
+        global all_completions
+        all_completions.append(self)
+
+    @property
+    def is_persistent(self):
+        return (not self.is_errored) and self.executed
+
+    @property
+    def is_effective(self):
+        return self.effective
+
+    @property
+    def is_errored(self):
+        return self.error is not None
+
+    def execute(self):
+        if not self.executed:
+            self._result = self.execute_cb()
+            self.executed = True
+
+        if not self.effective:
+            # TODO: check self.result for API errors
+            if self.complete_cb is None:
+                self.effective = True
+            else:
+                self.effective = self.complete_cb()
+
+
+def deferred_read(f):
+    """
+    Decorator to make TestOrchestrator methods return
+    a completion object that executes themselves.
+    """
+
+    @functools.wraps(f)
+    def wrapper(*args, **kwargs):
+        return TestReadCompletion(lambda: f(*args, **kwargs))
+
+    return wrapper
+
+
+class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
+    """
+    This is an orchestrator implementation used for internal testing. It's meant for
+    development environments and integration testing.
+
+    It does not actually do anything.
+
+    The implementation is similar to the Rook orchestrator, but simpler.
+    """
+
+    def _progress(self, *args, **kwargs):
+        try:
+            self.remote("progress", *args, **kwargs)
+        except ImportError:
+            # If the progress module is disabled that's fine,
+            # they just won't see the output.
+            pass
+
+    def wait(self, completions):
+        self.log.info("wait: completions={0}".format(completions))
+
+        incomplete = False
+
+        # Our `wait` implementation is very simple because everything's
+        # just an API call.
+        for c in completions:
+            if not isinstance(c, TestReadCompletion) and \
+                    not isinstance(c, TestWriteCompletion):
+                raise TypeError(
+                    "wait() requires list of completions, not {0}".format(
+                        c.__class__
+                    ))
+
+            if c.is_complete:
+                continue
+
+            if not c.is_read:
+                self._progress("update", c.id, c.message, 0.5)
+
+            try:
+                c.execute()
+            except Exception as e:
+                self.log.exception("Completion {0} threw an exception:".format(
+                    c.message
+                ))
+                c.error = e
+                c._complete = True
+                if not c.is_read:
+                    self._progress("complete", c.id)
+            else:
+                if c.is_complete:
+                    if not c.is_read:
+                        self._progress("complete", c.id)
+
+            if not c.is_complete:
+                incomplete = True
+
+        return not incomplete
+
+    def available(self):
+        return True, ""
+
+    def __init__(self, *args, **kwargs):
+        super(TestOrchestrator, self).__init__(*args, **kwargs)
+
+        self._initialized = threading.Event()
+        self._shutdown = threading.Event()
+
+    def shutdown(self):
+        self._shutdown.set()
+
+    def serve(self):
+
+        self._initialized.set()
+
+        while not self._shutdown.is_set():
+            # XXX hack (or is it?) to kick all completions periodically,
+            # in case we had a caller that wait()'ed on them long enough
+            # to get persistence but not long enough to get completion
+
+            global all_completions
+            self.wait(all_completions)
+            all_completions = [c for c in all_completions if not c.is_complete]
+
+            self._shutdown.wait(5)
+
+    @deferred_read
+    def get_inventory(self, node_filter=None):
+        """
+        There is no guarantee which devices are returned by get_inventory.
+        """
+        try:
+            c_v_out = check_output(['ceph_volume', 'inventory', '--format', 'json'])
+        except OSError:
+            cmd = """
+            . /tmp/ceph-volume-virtualenv/bin/activate
+            ceph-volume inventory --format json
+            """
+            c_v_out = check_output(cmd, shell=True)
+
+        for out in c_v_out.splitlines():
+            if not out.startswith(b'-->') and not out.startswith(b' stderr'):
+                self.log.error(out)
+                devs = []
+                for device in json.loads(out):
+                    dev = orchestrator.InventoryDevice()
+                    if device["sys_api"]["rotational"] == "1":
+                        dev.type = 'hdd'  # 'ssd', 'hdd', 'nvme'
+                    elif 'nvme' in device["path"]:
+                        dev.type = 'nvme'
+                    else:
+                        dev.type = 'ssd'
+                    dev.size = device['sys_api']['size']
+                    dev.id = device['path']
+                    dev.extended = device
+                    devs.append(dev)
+                return [orchestrator.InventoryNode('localhost', devs)]
+        self.log.error('c-v failed: ' + str(c_v_out))
+        raise Exception('c-v failed')
+
+    @deferred_read
+    def describe_service(self, service_type=None, service_id=None, node_name=None):
+        """
+        There is no guarantee which daemons are returned by describe_service, except that
+        it returns the mgr we're running in.
+        """
+        if service_type:
+            assert service_type in ("mds", "osd", "mon", "rgw", "mgr"), service_type + " unsupported"
+
+        out = map(str, check_output(['ps', 'aux']).splitlines())
+        types = [service_type] if service_type else ("mds", "osd", "mon", "rgw", "mgr")
+        processes = [p for p in out if any([('ceph-' + t in p) for t in types])]
+
+        result = []
+        for p in processes:
+            sd = orchestrator.ServiceDescription()
+            sd.nodename = 'localhost'
+            sd.daemon_name = re.search('ceph-[^ ]+', p).group()
+            result.append(sd)
+
+        return result
+
+    def add_stateless_service(self, service_type, spec):
+        raise NotImplementedError(service_type)
+
+    def create_osds(self, spec):
+        raise NotImplementedError(str(spec))
+