From 9e926e9927a4c9592403dbce959e526ba3860206 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Mon, 28 Jan 2019 16:57:38 +0100 Subject: [PATCH] mgr/orchestrator: Unify `osd create` and `osd add` Also: * Added some more tests * Better validation of drive Groups * Simplified `TestWriteCompletion` Signed-off-by: Sebastian Wagner --- qa/tasks/mgr/test_orchestrator_cli.py | 30 +++- src/pybind/mgr/ansible/module.py | 2 +- src/pybind/mgr/mgr_module.py | 7 + src/pybind/mgr/orchestrator.py | 44 +++++- src/pybind/mgr/orchestrator_cli/module.py | 130 +++++++----------- .../mgr/orchestrator_cli/test_orchestrator.py | 4 +- src/pybind/mgr/test_orchestrator/module.py | 41 ++++-- 7 files changed, 152 insertions(+), 106 deletions(-) diff --git a/qa/tasks/mgr/test_orchestrator_cli.py b/qa/tasks/mgr/test_orchestrator_cli.py index c8964f25fac9..e85444cba6ae 100644 --- a/qa/tasks/mgr/test_orchestrator_cli.py +++ b/qa/tasks/mgr/test_orchestrator_cli.py @@ -1,4 +1,8 @@ +import json import logging +from tempfile import NamedTemporaryFile + +from teuthology.exceptions import CommandFailedError from mgr_test_case import MgrTestCase @@ -10,8 +14,15 @@ class TestOrchestratorCli(MgrTestCase): MGRS_REQUIRED = 1 def _orch_cmd(self, *args): - retstr = self.mgr_cluster.mon_manager.raw_cluster_cmd("orchestrator", *args) - return retstr + return self.mgr_cluster.mon_manager.raw_cluster_cmd("orchestrator", *args) + + def _orch_cmd_result(self, *args, **kwargs): + """ + superfluous, but raw_cluster_cmd doesn't support kwargs. + """ + res = self.mgr_cluster.mon_manager.raw_cluster_cmd_result("orchestrator", *args, **kwargs) + self.assertEqual(res, 0) + def setUp(self): super(TestOrchestratorCli, self).setUp() @@ -41,3 +52,18 @@ class TestOrchestratorCli(MgrTestCase): self._orch_cmd("service-instance", "reload", "mds", "a") self._orch_cmd("service-instance", "stop", "mds", "a") self._orch_cmd("service-instance", "start", "mds", "a") + + def test_osd_create(self): + self._orch_cmd("osd", "create", "*:device") + self._orch_cmd("osd", "create", "*:device,device2") + + drive_group = { + "host_pattern": "*", + "data_devices": {"paths": ["/dev/sda"]} + } + + self._orch_cmd_result("osd", "create", "-i", "-", stdin=json.dumps(drive_group)) + + with self.assertRaises(CommandFailedError): + self._orch_cmd("osd", "create", "notfound:device") + diff --git a/src/pybind/mgr/ansible/module.py b/src/pybind/mgr/ansible/module.py index d22f1bb6c636..3d7ad105b2c8 100644 --- a/src/pybind/mgr/ansible/module.py +++ b/src/pybind/mgr/ansible/module.py @@ -290,7 +290,7 @@ class Module(MgrModule, orchestrator.Orchestrator): return ansible_operation - def create_osds(self, drive_group, all_hosts=None): + def create_osds(self, drive_group, all_hosts): """Create one or more OSDs within a single Drive Group. If no host provided the operation affects all the host in the OSDS role diff --git a/src/pybind/mgr/mgr_module.py b/src/pybind/mgr/mgr_module.py index 2933389f47a8..50477f7f4626 100644 --- a/src/pybind/mgr/mgr_module.py +++ b/src/pybind/mgr/mgr_module.py @@ -1158,6 +1158,13 @@ class MgrModule(ceph_module.BaseMgrModule): Invoke a method on another module. All arguments, and the return value from the other module must be serializable. + Limitation: Do not import any modules within the called method. + Otherwise you will get an error in Python 2:: + + RuntimeError('cannot unmarshal code objects in restricted execution mode',) + + + :param module_name: Name of other module. If module isn't loaded, an ImportError exception is raised. :param method_name: Method name. If it does not exist, a NameError diff --git a/src/pybind/mgr/orchestrator.py b/src/pybind/mgr/orchestrator.py index b2616164060d..168d533e3d1c 100644 --- a/src/pybind/mgr/orchestrator.py +++ b/src/pybind/mgr/orchestrator.py @@ -4,6 +4,8 @@ ceph-mgr orchestrator interface Please see the ceph-mgr module developer's guide for more information. """ +import six + try: from typing import TypeVar, Generic, List, Optional, Union T = TypeVar('T') @@ -12,6 +14,7 @@ except ImportError: T, G = object, object import time +import fnmatch class _Completion(G): @@ -255,7 +258,7 @@ class Orchestrator(object): assert not (service_name and service_id) raise NotImplementedError() - def create_osds(self, drive_group, all_hosts=None): + def create_osds(self, drive_group, all_hosts): # type: (DriveGroupSpec, List[str]) -> WriteCompletion """ Create one or more OSDs within a single Drive Group. @@ -509,8 +512,6 @@ class DeviceSelection(object): #: List of absolute paths to the devices. self.paths = paths # type: List[str] - if self.paths and any(p is not None for p in [id_model, size, rotates, count]): - raise TypeError('`paths` and other parameters are mutually exclusive') #: A wildcard string. e.g: "SDD*" self.id_model = id_model @@ -525,12 +526,29 @@ class DeviceSelection(object): #: if this is present limit the number of drives to this number. self.count = count + self.validate() + + def validate(self): + props = [self.id_model, self.size, self.rotates, self.count] + if self.paths and any(p is not None for p in props): + raise DriveGroupValidationError('DeviceSelection: `paths` and other parameters are mutually exclusive') + if not any(p is not None for p in [self.paths] + props): + raise DriveGroupValidationError('DeviceSelection cannot be empty') @classmethod def from_json(cls, device_spec): return cls(**device_spec) +class DriveGroupValidationError(Exception): + """ + Defining an exception here is a bit problematic, cause you cannot properly catch it, + if it was raised in a different mgr module. + """ + + def __init__(self, msg): + super(DriveGroupValidationError, self).__init__('Failed to validate Drive Group: ' + msg) + class DriveGroupSpec(object): """ Describe a drive group in the same form that ceph-volume @@ -563,7 +581,6 @@ class DriveGroupSpec(object): #: To fully utilize nvme devices multiple osds are required. self.osds_per_device = osds_per_device - assert objectstore in ('filestore', 'bluestore') #: ``filestore`` or ``bluestore`` self.objectstore = objectstore @@ -585,7 +602,7 @@ class DriveGroupSpec(object): @classmethod def from_json(self, json_drive_group): """ - Initialize and verify 'Drive group' structure + Initialize 'Drive group' structure :param json_drive_group: A valid json string with a Drive Group specification @@ -595,9 +612,21 @@ class DriveGroupSpec(object): return DriveGroupSpec(**args) def hosts(self, all_hosts): - import fnmatch return fnmatch.filter(all_hosts, self.host_pattern) + def validate(self, all_hosts): + if not isinstance(self.host_pattern, six.string_types): + raise DriveGroupValidationError('host_pattern must be of type string') + + specs = [self.data_devices, self.db_devices, self.wal_devices, self.journal_devices] + for s in filter(None, specs): + s.validate() + if self.objectstore not in ('filestore', 'bluestore'): + raise DriveGroupValidationError("objectstore not in ('filestore', 'bluestore')") + if not self.hosts(all_hosts): + raise DriveGroupValidationError( + "host_pattern '{}' does not match any hosts".format(self.host_pattern)) + class StatelessServiceSpec(object): # Request to orchestrator for a group of stateless services @@ -702,7 +731,7 @@ def _mk_orch_methods(cls): return inner for meth in Orchestrator.__dict__: - if not meth.startswith('_') and meth not in ['is_orchestrator_module', 'available']: + if not meth.startswith('_') and meth not in ['is_orchestrator_module']: setattr(cls, meth, shim(meth)) return cls @@ -717,6 +746,7 @@ class OrchestratorClientMixin(Orchestrator): o = self._select_orchestrator() except AttributeError: o = self.remote('orchestrator_cli', '_select_orchestrator') + self.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(self.module_name, o, meth, args, kwargs)) return self.remote(o, meth, *args, **kwargs) def _orchestrator_wait(self, completions): diff --git a/src/pybind/mgr/orchestrator_cli/module.py b/src/pybind/mgr/orchestrator_cli/module.py index 916b4d0fcd98..e7fb57752623 100644 --- a/src/pybind/mgr/orchestrator_cli/module.py +++ b/src/pybind/mgr/orchestrator_cli/module.py @@ -1,5 +1,11 @@ import errno import json + +try: + from typing import Dict +except ImportError: + pass # just for type checking. + from mgr_module import MgrModule, HandleCommandResult import orchestrator @@ -40,14 +46,14 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule): "perm": "r" }, { - 'cmd': "orchestrator osd add " - "name=svc_arg,type=CephString ", - "desc": "Create an OSD service", + 'cmd': "orchestrator osd create " + "name=svc_arg,type=CephString,req=false ", + "desc": "Create an OSD service. Either --svc_arg=host:drives or -i ", "perm": "rw" }, { 'cmd': "orchestrator osd rm " - "name=svc_id,type=CephString ", + "name=svc_id,type=CephString,n=N ", "desc": "Remove an OSD service", "perm": "rw" }, @@ -117,17 +123,6 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule): "cmd": "orchestrator status", "desc": "Report configured backend and its status", "perm": "r" - }, - { 'cmd': "orchestrator osd create " - "name=drive_group,type=CephString,req=false ", - "desc": "OSD's creation following specification \ - in parameter or readed from -i input.", - "perm": "rw" - }, - { 'cmd': "orchestrator osd remove " - "name=osd_ids,type=CephInt,n=N ", - "desc": "Remove Osd's", - "perm": "rw" } ] @@ -227,80 +222,58 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule): return HandleCommandResult(stdout="\n".join(lines)) - def _osd_add(self, cmd): - device_spec = cmd['svc_arg'] - try: - node_name, block_device = device_spec.split(":") - except TypeError: - return HandleCommandResult(-errno.EINVAL, - stderr="Invalid device spec, should be :") - - devs = orchestrator.DeviceSelection(paths=block_device) - spec = orchestrator.DriveGroupSpec(node_name, data_devices=devs) - - # TODO: Remove this and make the orchestrator composable - # or - # Probably this should be moved to each of the orchestrators, - # then we wouldn't need the "all_hosts" parameter at all. - host_completion = self.get_hosts() - self.wait([host_completion]) - all_hosts = [h.name for h in host_completion.result] - - completion = self.create_osds(spec, all_hosts=all_hosts) - self._orchestrator_wait([completion]) + def _create_osd(self, inbuf, cmd): + # type: (str, Dict[str, str]) -> HandleCommandResult + """Create one or more OSDs""" - return HandleCommandResult() + usage = """ +Usage: + ceph orchestrator osd create -i + ceph orchestrator osd create host:device1,device2,... +""" - def _create_osd(self, inbuf, cmd): - """Create one or more OSDs + if inbuf: + try: + drive_group = orchestrator.DriveGroupSpec.from_json(json.loads(inbuf)) + except ValueError as e: + msg = 'Failed to read JSON input: {}'.format(str(e)) + usage + return HandleCommandResult(-errno.EINVAL, stderr=msg) - :cmd : Arguments for the create osd - """ - #Obtain/validate parameters for the operation - cmdline_error = "" - if "drive_group" in cmd.keys(): - params = cmd["drive_group"] - elif inbuf: - params = inbuf else: - cmdline_error = "Please, use 'drive_group' parameter \ - or specify -i " + try: + node_name, block_device = cmd['svc_arg'].split(":") + block_devices = block_device.split(',') + except (TypeError, KeyError, ValueError): + msg = "Invalid host:device spec: '{}'".format(cmd['svc_arg']) + usage + return HandleCommandResult(-errno.EINVAL, stderr=msg) - if cmdline_error: - return HandleCommandResult(-errno.EINVAL, stderr=cmdline_error) + devs = orchestrator.DeviceSelection(paths=block_devices) + drive_group = orchestrator.DriveGroupSpec(node_name, data_devices=devs) - try: - json_dg = json.loads(params) - except ValueError as msg: - return HandleCommandResult(-errno.EINVAL, stderr=msg) - - # Create the drive group - drive_group = orchestrator.DriveGroupSpec.from_json(json_dg) - #Read other Drive_group + # TODO: Remove this and make the orchestrator composable + # Like a future or so. + host_completion = self.get_hosts() + self._orchestrator_wait([host_completion]) + all_hosts = [h.name for h in host_completion.result] - #Launch the operation in the orchestrator module - completion = self.create_osds(drive_group) + try: + drive_group.validate(all_hosts) + except orchestrator.DriveGroupValidationError as e: + return HandleCommandResult(-errno.EINVAL, stderr=str(e)) - #Wait until the operation finishes + completion = self.create_osds(drive_group, all_hosts) self._orchestrator_wait([completion]) + self.log.warning(str(completion.result)) + return HandleCommandResult(stdout=str(completion.result)) - #return result - return HandleCommandResult(stdout=completion.result) - - def _remove_osd(self, cmd): + def _osd_rm(self, cmd): """ Remove OSD's :cmd : Arguments for remove the osd """ - - #Launch the operation in the orchestrator module - completion = self.remove_osds(cmd["osd_ids"]) - - #Wait until the operation finishes + completion = self.remove_osds(cmd["svc_id"]) self._orchestrator_wait([completion]) - - #return result - return HandleCommandResult(stdout=completion.result) + return HandleCommandResult(stdout=str(completion.result)) def _add_stateless_svc(self, svc_type, spec): completion = self.add_stateless_service(svc_type, spec) @@ -334,9 +307,6 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule): self._orchestrator_wait([completion]) return HandleCommandResult() - def _osd_rm(self, cmd): - return self._rm_stateless_svc("osd", cmd['svc_id']) - def _mds_rm(self, cmd): return self._rm_stateless_svc("mds", cmd['svc_id']) @@ -441,15 +411,13 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule): except NotImplementedError: return HandleCommandResult(-errno.EINVAL, stderr="Command not found") - def _handle_command(self, _, cmd): + def _handle_command(self, inbuf, cmd): if cmd['prefix'] == "orchestrator device ls": return self._list_devices(cmd) elif cmd['prefix'] == "orchestrator service ls": return self._list_services(cmd) elif cmd['prefix'] == "orchestrator service status": return self._list_services(cmd) # TODO: create more detailed output - elif cmd['prefix'] == "orchestrator osd add": - return self._osd_add(cmd) elif cmd['prefix'] == "orchestrator osd rm": return self._osd_rm(cmd) elif cmd['prefix'] == "orchestrator mds add": @@ -473,7 +441,7 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule): elif cmd['prefix'] == "orchestrator status": return self._status() elif cmd['prefix'] == "orchestrator osd create": - return self._create_osd(_, cmd) + return self._create_osd(inbuf, cmd) elif cmd['prefix'] == "orchestrator osd remove": return self._remove_osd(cmd) else: diff --git a/src/pybind/mgr/orchestrator_cli/test_orchestrator.py b/src/pybind/mgr/orchestrator_cli/test_orchestrator.py index a33e2d6cd831..b0c8742b551c 100644 --- a/src/pybind/mgr/orchestrator_cli/test_orchestrator.py +++ b/src/pybind/mgr/orchestrator_cli/test_orchestrator.py @@ -2,7 +2,7 @@ from __future__ import absolute_import import pytest -from orchestrator import DriveGroupSpec, DeviceSelection +from orchestrator import DriveGroupSpec, DeviceSelection, DriveGroupValidationError def test_DriveGroup(): @@ -31,6 +31,6 @@ def test_drive_selection(): spec = DriveGroupSpec('node_name', data_devices=devs) assert spec.data_devices.paths == ['/dev/sda'] - with pytest.raises(TypeError, match='exclusive'): + with pytest.raises(DriveGroupValidationError, match='exclusive'): DeviceSelection(paths=['/dev/sda'], rotates=False) diff --git a/src/pybind/mgr/test_orchestrator/module.py b/src/pybind/mgr/test_orchestrator/module.py index cbe32695eee4..be7fbd57c5ae 100644 --- a/src/pybind/mgr/test_orchestrator/module.py +++ b/src/pybind/mgr/test_orchestrator/module.py @@ -26,6 +26,10 @@ class TestReadCompletion(orchestrator.ReadCompletion): global all_completions all_completions.append(self) + def __str__(self): + return "TestReadCompletion(result={} message={})".format(self.result, self.message) + + @property def result(self): return self._result @@ -40,10 +44,9 @@ class TestReadCompletion(orchestrator.ReadCompletion): class TestWriteCompletion(orchestrator.WriteCompletion): - def __init__(self, execute_cb, complete_cb, message): + def __init__(self, execute_cb, message): super(TestWriteCompletion, self).__init__() self.execute_cb = execute_cb - self.complete_cb = complete_cb # Executed means I executed my API call, it may or may # not have succeeded @@ -63,6 +66,13 @@ class TestWriteCompletion(orchestrator.WriteCompletion): global all_completions all_completions.append(self) + def __str__(self): + return "TestWriteCompletion(executed={} result={} id={} message={} error={})".format(self.executed, self._result, self.id, self.message, self.error) + + @property + def result(self): + return self._result + @property def is_persistent(self): return (not self.is_errored) and self.executed @@ -79,13 +89,18 @@ class TestWriteCompletion(orchestrator.WriteCompletion): if not self.executed: self._result = self.execute_cb() self.executed = True + self.effective = True - if not self.effective: - # TODO: check self.result for API errors - if self.complete_cb is None: - self.effective = True - else: - self.effective = self.complete_cb() + +def deferred_write(message): + def wrapper(f): + @functools.wraps(f) + def inner(*args, **kwargs): + args[0].log.warning('message' + message) + return TestWriteCompletion(lambda: f(*args, **kwargs), + '{}, args={}, kwargs={}'.format(message, args, kwargs)) + return inner + return wrapper def deferred_read(f): @@ -246,12 +261,12 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator): def add_stateless_service(self, service_type, spec): raise NotImplementedError(service_type) + @deferred_write("create_osds") def create_osds(self, drive_group, all_hosts): - raise NotImplementedError(str(drive_group)) + drive_group.validate(all_hosts) + + @deferred_write("service_action") def service_action(self, action, service_type, service_name=None, service_id=None): - return TestWriteCompletion( - lambda: True, None, - "Pretending to {} service {} (name={}, id={})".format( - action, service_type, service_name, service_id)) + pass -- 2.47.3