From: Sebastian Wagner Date: Fri, 23 Nov 2018 13:55:15 +0000 (+0100) Subject: mgr/orchestrator: Add test orchestrator X-Git-Tag: v14.1.0~573^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=933b2cfc28b7cf1eda5c26a68ffed9377c76a5cd;p=ceph-ci.git mgr/orchestrator: Add test orchestrator 1. To be able to run the cli without an external orchestrator. 2. Run the CLI in Teuthology. Signed-off-by: Sebastian Wagner --- diff --git a/qa/suites/rados/mgr/tasks/orchestrator_cli.yaml b/qa/suites/rados/mgr/tasks/orchestrator_cli.yaml new file mode 100644 index 00000000000..65b1d78b158 --- /dev/null +++ b/qa/suites/rados/mgr/tasks/orchestrator_cli.yaml @@ -0,0 +1,18 @@ + +tasks: + - install: + - ceph: + # tests may leave mgrs broken, so don't try and call into them + # to invoke e.g. pg dump during teardown. + wait-for-scrub: false + log-whitelist: + - overall HEALTH_ + - \(MGR_DOWN\) + - \(MGR_INSIGHTS_WARNING\) + - \(insights_health_check + - \(PG_ + - replacing it with standby + - No standby daemons available + - cephfs_test_runner: + modules: + - tasks.mgr.test_orchestrator_cli \ No newline at end of file diff --git a/qa/tasks/mgr/test_orchestrator_cli.py b/qa/tasks/mgr/test_orchestrator_cli.py new file mode 100644 index 00000000000..b6a4cc61e54 --- /dev/null +++ b/qa/tasks/mgr/test_orchestrator_cli.py @@ -0,0 +1,33 @@ +import logging + +from mgr_test_case import MgrTestCase + + +log = logging.getLogger(__name__) + + +class TestOrchestratorCli(MgrTestCase): + MGRS_REQUIRED = 1 + + def _orch_cmd(self, *args): + retstr = self.mgr_cluster.mon_manager.raw_cluster_cmd("orchestrator", *args) + return retstr + + def setUp(self): + super(TestOrchestratorCli, self).setUp() + + self._load_module("orchestrator_cli") + self._load_module("test_orchestrator") + self._orch_cmd("set", "backend", "test_orchestrator") + + def test_status(self): + ret = self._orch_cmd("status") + self.assertIn("test_orchestrator", ret) + + def test_device_ls(self): + ret = self._orch_cmd("device", "ls") + self.assertIn("localhost:", ret) + + def test_service_ls(self): + ret = self._orch_cmd("service", "ls") + self.assertIn("ceph-mgr", ret) diff --git a/src/pybind/mgr/orchestrator_cli/README.md b/src/pybind/mgr/orchestrator_cli/README.md new file mode 100644 index 00000000000..97e8a1069af --- /dev/null +++ b/src/pybind/mgr/orchestrator_cli/README.md @@ -0,0 +1,16 @@ +# Orchestrator CLI + +See also [../../../doc/mgr/orchestrator_cli.rst](../../../doc/mgr/orchestrator_cli.rst). + +## Running the Teuthology tests + +To run the API tests against a real Ceph cluster, we leverage the Teuthology +framework and the `test_orchestrator` backend. + +``source`` the script and run the tests manually:: + + $ source ../dashboard/run-backend-api-tests.sh + $ run_teuthology_tests tasks.mgr.test_orchestrator_cli + $ cleanup_teuthology + + \ No newline at end of file diff --git a/src/pybind/mgr/test_orchestrator/__init__.py b/src/pybind/mgr/test_orchestrator/__init__.py new file mode 100644 index 00000000000..2c4d30973a8 --- /dev/null +++ b/src/pybind/mgr/test_orchestrator/__init__.py @@ -0,0 +1 @@ +from .module import TestOrchestrator diff --git a/src/pybind/mgr/test_orchestrator/module.py b/src/pybind/mgr/test_orchestrator/module.py new file mode 100644 index 00000000000..324aeb07612 --- /dev/null +++ b/src/pybind/mgr/test_orchestrator/module.py @@ -0,0 +1,251 @@ +import json +import re +import threading +import functools +import uuid +from subprocess import check_output + +from mgr_module import MgrModule + +import orchestrator + + +all_completions = [] + + +class TestReadCompletion(orchestrator.ReadCompletion): + + def __init__(self, cb): + super(TestReadCompletion, self).__init__() + self.cb = cb + self._result = None + self._complete = False + + self.message = "" + + global all_completions + all_completions.append(self) + + @property + def result(self): + return self._result + + @property + def is_complete(self): + return self._complete + + def execute(self): + self._result = self.cb() + self._complete = True + + +class TestWriteCompletion(orchestrator.WriteCompletion): + def __init__(self, execute_cb, complete_cb, message): + super(TestWriteCompletion, self).__init__() + self.execute_cb = execute_cb + self.complete_cb = complete_cb + + # Executed means I executed my API call, it may or may + # not have succeeded + self.executed = False + + self._result = None + + self.effective = False + + self.id = str(uuid.uuid4()) + + self.message = message + + self.error = None + + # XXX hacky global + global all_completions + all_completions.append(self) + + @property + def is_persistent(self): + return (not self.is_errored) and self.executed + + @property + def is_effective(self): + return self.effective + + @property + def is_errored(self): + return self.error is not None + + def execute(self): + if not self.executed: + self._result = self.execute_cb() + self.executed = True + + if not self.effective: + # TODO: check self.result for API errors + if self.complete_cb is None: + self.effective = True + else: + self.effective = self.complete_cb() + + +def deferred_read(f): + """ + Decorator to make TestOrchestrator methods return + a completion object that executes themselves. + """ + + @functools.wraps(f) + def wrapper(*args, **kwargs): + return TestReadCompletion(lambda: f(*args, **kwargs)) + + return wrapper + + +class TestOrchestrator(MgrModule, orchestrator.Orchestrator): + """ + This is an orchestrator implementation used for internal testing. It's meant for + development environments and integration testing. + + It does not actually do anything. + + The implementation is similar to the Rook orchestrator, but simpler. + """ + + def _progress(self, *args, **kwargs): + try: + self.remote("progress", *args, **kwargs) + except ImportError: + # If the progress module is disabled that's fine, + # they just won't see the output. + pass + + def wait(self, completions): + self.log.info("wait: completions={0}".format(completions)) + + incomplete = False + + # Our `wait` implementation is very simple because everything's + # just an API call. + for c in completions: + if not isinstance(c, TestReadCompletion) and \ + not isinstance(c, TestWriteCompletion): + raise TypeError( + "wait() requires list of completions, not {0}".format( + c.__class__ + )) + + if c.is_complete: + continue + + if not c.is_read: + self._progress("update", c.id, c.message, 0.5) + + try: + c.execute() + except Exception as e: + self.log.exception("Completion {0} threw an exception:".format( + c.message + )) + c.error = e + c._complete = True + if not c.is_read: + self._progress("complete", c.id) + else: + if c.is_complete: + if not c.is_read: + self._progress("complete", c.id) + + if not c.is_complete: + incomplete = True + + return not incomplete + + def available(self): + return True, "" + + def __init__(self, *args, **kwargs): + super(TestOrchestrator, self).__init__(*args, **kwargs) + + self._initialized = threading.Event() + self._shutdown = threading.Event() + + def shutdown(self): + self._shutdown.set() + + def serve(self): + + self._initialized.set() + + while not self._shutdown.is_set(): + # XXX hack (or is it?) to kick all completions periodically, + # in case we had a caller that wait()'ed on them long enough + # to get persistence but not long enough to get completion + + global all_completions + self.wait(all_completions) + all_completions = [c for c in all_completions if not c.is_complete] + + self._shutdown.wait(5) + + @deferred_read + def get_inventory(self, node_filter=None): + """ + There is no guarantee which devices are returned by get_inventory. + """ + try: + c_v_out = check_output(['ceph_volume', 'inventory', '--format', 'json']) + except OSError: + cmd = """ + . /tmp/ceph-volume-virtualenv/bin/activate + ceph-volume inventory --format json + """ + c_v_out = check_output(cmd, shell=True) + + for out in c_v_out.splitlines(): + if not out.startswith(b'-->') and not out.startswith(b' stderr'): + self.log.error(out) + devs = [] + for device in json.loads(out): + dev = orchestrator.InventoryDevice() + if device["sys_api"]["rotational"] == "1": + dev.type = 'hdd' # 'ssd', 'hdd', 'nvme' + elif 'nvme' in device["path"]: + dev.type = 'nvme' + else: + dev.type = 'ssd' + dev.size = device['sys_api']['size'] + dev.id = device['path'] + dev.extended = device + devs.append(dev) + return [orchestrator.InventoryNode('localhost', devs)] + self.log.error('c-v failed: ' + str(c_v_out)) + raise Exception('c-v failed') + + @deferred_read + def describe_service(self, service_type=None, service_id=None, node_name=None): + """ + There is no guarantee which daemons are returned by describe_service, except that + it returns the mgr we're running in. + """ + if service_type: + assert service_type in ("mds", "osd", "mon", "rgw", "mgr"), service_type + " unsupported" + + out = map(str, check_output(['ps', 'aux']).splitlines()) + types = [service_type] if service_type else ("mds", "osd", "mon", "rgw", "mgr") + processes = [p for p in out if any([('ceph-' + t in p) for t in types])] + + result = [] + for p in processes: + sd = orchestrator.ServiceDescription() + sd.nodename = 'localhost' + sd.daemon_name = re.search('ceph-[^ ]+', p).group() + result.append(sd) + + return result + + def add_stateless_service(self, service_type, spec): + raise NotImplementedError(service_type) + + def create_osds(self, spec): + raise NotImplementedError(str(spec)) +