]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/orchestrator: Unify `osd create` and `osd add` 26171/head
authorSebastian Wagner <sebastian.wagner@suse.com>
Mon, 28 Jan 2019 15:57:38 +0000 (16:57 +0100)
committerSebastian Wagner <sebastian.wagner@suse.com>
Fri, 1 Feb 2019 09:10:59 +0000 (10:10 +0100)
Also:

* Added some more tests
* Better validation of drive Groups
* Simplified `TestWriteCompletion`

Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
qa/tasks/mgr/test_orchestrator_cli.py
src/pybind/mgr/ansible/module.py
src/pybind/mgr/mgr_module.py
src/pybind/mgr/orchestrator.py
src/pybind/mgr/orchestrator_cli/module.py
src/pybind/mgr/orchestrator_cli/test_orchestrator.py
src/pybind/mgr/test_orchestrator/module.py

index c8964f25fac9a724bf9e7d89ac9e4477a641446d..e85444cba6aeec2a2163f0a342bb57f36ef16c8d 100644 (file)
@@ -1,4 +1,8 @@
+import json
 import logging
+from tempfile import NamedTemporaryFile
+
+from teuthology.exceptions import CommandFailedError
 
 from mgr_test_case import MgrTestCase
 
@@ -10,8 +14,15 @@ class TestOrchestratorCli(MgrTestCase):
     MGRS_REQUIRED = 1
 
     def _orch_cmd(self, *args):
-        retstr = self.mgr_cluster.mon_manager.raw_cluster_cmd("orchestrator", *args)
-        return retstr
+        return self.mgr_cluster.mon_manager.raw_cluster_cmd("orchestrator", *args)
+
+    def _orch_cmd_result(self, *args, **kwargs):
+        """
+        superfluous, but raw_cluster_cmd doesn't support kwargs.
+        """
+        res = self.mgr_cluster.mon_manager.raw_cluster_cmd_result("orchestrator", *args, **kwargs)
+        self.assertEqual(res, 0)
+
 
     def setUp(self):
         super(TestOrchestratorCli, self).setUp()
@@ -41,3 +52,18 @@ class TestOrchestratorCli(MgrTestCase):
         self._orch_cmd("service-instance", "reload", "mds", "a")
         self._orch_cmd("service-instance", "stop", "mds", "a")
         self._orch_cmd("service-instance", "start", "mds", "a")
+
+    def test_osd_create(self):
+        self._orch_cmd("osd", "create", "*:device")
+        self._orch_cmd("osd", "create", "*:device,device2")
+
+        drive_group = {
+            "host_pattern": "*",
+            "data_devices": {"paths": ["/dev/sda"]}
+        }
+
+        self._orch_cmd_result("osd", "create", "-i", "-", stdin=json.dumps(drive_group))
+
+        with self.assertRaises(CommandFailedError):
+            self._orch_cmd("osd", "create", "notfound:device")
+
index d22f1bb6c6365f321d3d49ab43984bff91d367fb..3d7ad105b2c833f2c812ed2df5bbc77c353168cf 100644 (file)
@@ -290,7 +290,7 @@ class Module(MgrModule, orchestrator.Orchestrator):
 
         return ansible_operation
 
-    def create_osds(self, drive_group, all_hosts=None):
+    def create_osds(self, drive_group, all_hosts):
         """Create one or more OSDs within a single Drive Group.
         If no host provided the operation affects all the host in the OSDS role
 
index 2933389f47a883e9d1834daa907a6ad9e6892d3c..50477f7f46261065e91aa0d98aaa1439c8ed5817 100644 (file)
@@ -1158,6 +1158,13 @@ class MgrModule(ceph_module.BaseMgrModule):
         Invoke a method on another module.  All arguments, and the return
         value from the other module must be serializable.
 
+        Limitation: Do not import any modules within the called method.
+        Otherwise you will get an error in Python 2::
+
+            RuntimeError('cannot unmarshal code objects in restricted execution mode',)
+
+
+
         :param module_name: Name of other module.  If module isn't loaded,
                             an ImportError exception is raised.
         :param method_name: Method name.  If it does not exist, a NameError
index b2616164060df261ecbfa6e30a3caf06d666b01b..168d533e3d1c78444d14adbc5b50b5a9d0a7b34f 100644 (file)
@@ -4,6 +4,8 @@ ceph-mgr orchestrator interface
 
 Please see the ceph-mgr module developer's guide for more information.
 """
+import six
+
 try:
     from typing import TypeVar, Generic, List, Optional, Union
     T = TypeVar('T')
@@ -12,6 +14,7 @@ except ImportError:
     T, G = object, object
 
 import time
+import fnmatch
 
 
 class _Completion(G):
@@ -255,7 +258,7 @@ class Orchestrator(object):
         assert not (service_name and service_id)
         raise NotImplementedError()
 
-    def create_osds(self, drive_group, all_hosts=None):
+    def create_osds(self, drive_group, all_hosts):
         # type: (DriveGroupSpec, List[str]) -> WriteCompletion
         """
         Create one or more OSDs within a single Drive Group.
@@ -509,8 +512,6 @@ class DeviceSelection(object):
 
         #: List of absolute paths to the devices.
         self.paths = paths  # type: List[str]
-        if self.paths and any(p is not None for p in [id_model, size, rotates, count]):
-            raise TypeError('`paths` and other parameters are mutually exclusive')
 
         #: A wildcard string. e.g: "SDD*"
         self.id_model = id_model
@@ -525,12 +526,29 @@ class DeviceSelection(object):
 
         #: if this is present limit the number of drives to this number.
         self.count = count
+        self.validate()
+
+    def validate(self):
+        props = [self.id_model, self.size, self.rotates, self.count]
+        if self.paths and any(p is not None for p in props):
+            raise DriveGroupValidationError('DeviceSelection: `paths` and other parameters are mutually exclusive')
+        if not any(p is not None for p in [self.paths] + props):
+            raise DriveGroupValidationError('DeviceSelection cannot be empty')
 
     @classmethod
     def from_json(cls, device_spec):
         return cls(**device_spec)
 
 
+class DriveGroupValidationError(Exception):
+    """
+    Defining an exception here is a bit problematic, cause you cannot properly catch it,
+    if it was raised in a different mgr module.
+    """
+
+    def __init__(self, msg):
+        super(DriveGroupValidationError, self).__init__('Failed to validate Drive Group: ' + msg)
+
 class DriveGroupSpec(object):
     """
     Describe a drive group in the same form that ceph-volume
@@ -563,7 +581,6 @@ class DriveGroupSpec(object):
         #: To fully utilize nvme devices multiple osds are required.
         self.osds_per_device = osds_per_device
 
-        assert objectstore in ('filestore', 'bluestore')
         #: ``filestore`` or ``bluestore``
         self.objectstore = objectstore
 
@@ -585,7 +602,7 @@ class DriveGroupSpec(object):
     @classmethod
     def from_json(self, json_drive_group):
         """
-        Initialize and verify 'Drive group' structure
+        Initialize 'Drive group' structure
 
         :param json_drive_group: A valid json string with a Drive Group
                specification
@@ -595,9 +612,21 @@ class DriveGroupSpec(object):
         return DriveGroupSpec(**args)
 
     def hosts(self, all_hosts):
-        import fnmatch
         return fnmatch.filter(all_hosts, self.host_pattern)
 
+    def validate(self, all_hosts):
+        if not isinstance(self.host_pattern, six.string_types):
+            raise DriveGroupValidationError('host_pattern must be of type string')
+
+        specs = [self.data_devices, self.db_devices, self.wal_devices, self.journal_devices]
+        for s in filter(None, specs):
+            s.validate()
+        if self.objectstore not in ('filestore', 'bluestore'):
+            raise DriveGroupValidationError("objectstore not in ('filestore', 'bluestore')")
+        if not self.hosts(all_hosts):
+            raise DriveGroupValidationError(
+                "host_pattern '{}' does not match any hosts".format(self.host_pattern))
+
 
 class StatelessServiceSpec(object):
     # Request to orchestrator for a group of stateless services
@@ -702,7 +731,7 @@ def _mk_orch_methods(cls):
         return inner
 
     for meth in Orchestrator.__dict__:
-        if not meth.startswith('_') and meth not in ['is_orchestrator_module', 'available']:
+        if not meth.startswith('_') and meth not in ['is_orchestrator_module']:
             setattr(cls, meth, shim(meth))
     return cls
 
@@ -717,6 +746,7 @@ class OrchestratorClientMixin(Orchestrator):
             o = self._select_orchestrator()
         except AttributeError:
             o = self.remote('orchestrator_cli', '_select_orchestrator')
+        self.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(self.module_name, o, meth, args, kwargs))
         return self.remote(o, meth, *args, **kwargs)
 
     def _orchestrator_wait(self, completions):
index 916b4d0fcd980d1f01c84bc2537a86a9a4cb2e9b..e7fb57752623587ae8eeb11f24e82db2e3dfd49c 100644 (file)
@@ -1,5 +1,11 @@
 import errno
 import json
+
+try:
+    from typing import Dict
+except ImportError:
+    pass  # just for type checking.
+
 from mgr_module import MgrModule, HandleCommandResult
 
 import orchestrator
@@ -40,14 +46,14 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
             "perm": "r"
         },
         {
-            'cmd': "orchestrator osd add "
-                   "name=svc_arg,type=CephString ",
-            "desc": "Create an OSD service",
+            'cmd': "orchestrator osd create "
+                   "name=svc_arg,type=CephString,req=false ",
+            "desc": "Create an OSD service. Either --svc_arg=host:drives or -i <drive_group>",
             "perm": "rw"
         },
         {
             'cmd': "orchestrator osd rm "
-                   "name=svc_id,type=CephString ",
+                   "name=svc_id,type=CephString,n=N ",
             "desc": "Remove an OSD service",
             "perm": "rw"
         },
@@ -117,17 +123,6 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
             "cmd": "orchestrator status",
             "desc": "Report configured backend and its status",
             "perm": "r"
-        },
-        {   'cmd': "orchestrator osd create "
-                   "name=drive_group,type=CephString,req=false ",
-            "desc": "OSD's creation following specification \
-                     in <drive_group> parameter or readed from -i <file> input.",
-            "perm": "rw"
-        },
-        {   'cmd': "orchestrator osd remove "
-                   "name=osd_ids,type=CephInt,n=N ",
-            "desc": "Remove Osd's",
-            "perm": "rw"
         }
     ]
 
@@ -227,80 +222,58 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
 
             return HandleCommandResult(stdout="\n".join(lines))
 
-    def _osd_add(self, cmd):
-        device_spec = cmd['svc_arg']
-        try:
-            node_name, block_device = device_spec.split(":")
-        except TypeError:
-            return HandleCommandResult(-errno.EINVAL,
-                                       stderr="Invalid device spec, should be <node>:<device>")
-
-        devs = orchestrator.DeviceSelection(paths=block_device)
-        spec = orchestrator.DriveGroupSpec(node_name, data_devices=devs)
-
-        # TODO: Remove this and make the orchestrator composable
-        # or
-        # Probably this should be moved to each of the orchestrators,
-        # then we wouldn't need the "all_hosts" parameter at all.
-        host_completion = self.get_hosts()
-        self.wait([host_completion])
-        all_hosts = [h.name for h in host_completion.result]
-
-        completion = self.create_osds(spec, all_hosts=all_hosts)
-        self._orchestrator_wait([completion])
+    def _create_osd(self, inbuf, cmd):
+        # type: (str, Dict[str, str]) -> HandleCommandResult
+        """Create one or more OSDs"""
 
-        return HandleCommandResult()
+        usage = """
+Usage:
+  ceph orchestrator osd create -i <json_file>
+  ceph orchestrator osd create host:device1,device2,...
+"""
 
-    def _create_osd(self, inbuf, cmd):
-        """Create one or more OSDs
+        if inbuf:
+            try:
+                drive_group = orchestrator.DriveGroupSpec.from_json(json.loads(inbuf))
+            except ValueError as e:
+                msg = 'Failed to read JSON input: {}'.format(str(e)) + usage
+                return HandleCommandResult(-errno.EINVAL, stderr=msg)
 
-        :cmd : Arguments for the create osd
-        """
-        #Obtain/validate parameters for the operation
-        cmdline_error = ""
-        if "drive_group" in cmd.keys():
-            params = cmd["drive_group"]
-        elif inbuf:
-            params = inbuf
         else:
-            cmdline_error = "Please, use 'drive_group' parameter \
-                             or specify -i <input file>"
+            try:
+                node_name, block_device = cmd['svc_arg'].split(":")
+                block_devices = block_device.split(',')
+            except (TypeError, KeyError, ValueError):
+                msg = "Invalid host:device spec: '{}'".format(cmd['svc_arg']) + usage
+                return HandleCommandResult(-errno.EINVAL, stderr=msg)
 
-        if cmdline_error:
-            return HandleCommandResult(-errno.EINVAL, stderr=cmdline_error)
+            devs = orchestrator.DeviceSelection(paths=block_devices)
+            drive_group = orchestrator.DriveGroupSpec(node_name, data_devices=devs)
 
-        try:
-            json_dg = json.loads(params)
-        except ValueError as msg:
-            return HandleCommandResult(-errno.EINVAL, stderr=msg)
-
-        # Create the drive group
-        drive_group = orchestrator.DriveGroupSpec.from_json(json_dg)
-        #Read other Drive_group
+        # TODO: Remove this and make the orchestrator composable
+        #   Like a future or so.
+        host_completion = self.get_hosts()
+        self._orchestrator_wait([host_completion])
+        all_hosts = [h.name for h in host_completion.result]
 
-        #Launch the operation in the orchestrator module
-        completion = self.create_osds(drive_group)
+        try:
+            drive_group.validate(all_hosts)
+        except orchestrator.DriveGroupValidationError as e:
+                return HandleCommandResult(-errno.EINVAL, stderr=str(e))
 
-        #Wait until the operation finishes
+        completion = self.create_osds(drive_group, all_hosts)
         self._orchestrator_wait([completion])
+        self.log.warning(str(completion.result))
+        return HandleCommandResult(stdout=str(completion.result))
 
-        #return result
-        return HandleCommandResult(stdout=completion.result)
-
-    def _remove_osd(self, cmd):
+    def _osd_rm(self, cmd):
         """
         Remove OSD's
         :cmd : Arguments for remove the osd
         """
-
-        #Launch the operation in the orchestrator module
-        completion = self.remove_osds(cmd["osd_ids"])
-
-        #Wait until the operation finishes
+        completion = self.remove_osds(cmd["svc_id"])
         self._orchestrator_wait([completion])
-
-        #return result
-        return HandleCommandResult(stdout=completion.result)
+        return HandleCommandResult(stdout=str(completion.result))
 
     def _add_stateless_svc(self, svc_type, spec):
         completion = self.add_stateless_service(svc_type, spec)
@@ -334,9 +307,6 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
         self._orchestrator_wait([completion])
         return HandleCommandResult()
 
-    def _osd_rm(self, cmd):
-        return self._rm_stateless_svc("osd", cmd['svc_id'])
-
     def _mds_rm(self, cmd):
         return self._rm_stateless_svc("mds", cmd['svc_id'])
 
@@ -441,15 +411,13 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
         except NotImplementedError:
             return HandleCommandResult(-errno.EINVAL, stderr="Command not found")
 
-    def _handle_command(self, _, cmd):
+    def _handle_command(self, inbuf, cmd):
         if cmd['prefix'] == "orchestrator device ls":
             return self._list_devices(cmd)
         elif cmd['prefix'] == "orchestrator service ls":
             return self._list_services(cmd)
         elif cmd['prefix'] == "orchestrator service status":
             return self._list_services(cmd)  # TODO: create more detailed output
-        elif cmd['prefix'] == "orchestrator osd add":
-            return self._osd_add(cmd)
         elif cmd['prefix'] == "orchestrator osd rm":
             return self._osd_rm(cmd)
         elif cmd['prefix'] == "orchestrator mds add":
@@ -473,7 +441,7 @@ class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
         elif cmd['prefix'] == "orchestrator status":
             return self._status()
         elif cmd['prefix'] == "orchestrator osd create":
-            return self._create_osd(_, cmd)
+            return self._create_osd(inbuf, cmd)
         elif cmd['prefix'] == "orchestrator osd remove":
             return self._remove_osd(cmd)
         else:
index a33e2d6cd83144172ac8f846c2293997ff7f8568..b0c8742b551ce2b832f1c822648ab00fd53cba30 100644 (file)
@@ -2,7 +2,7 @@ from __future__ import absolute_import
 import pytest
 
 
-from orchestrator import DriveGroupSpec, DeviceSelection
+from orchestrator import DriveGroupSpec, DeviceSelection, DriveGroupValidationError
 
 
 def test_DriveGroup():
@@ -31,6 +31,6 @@ def test_drive_selection():
     spec = DriveGroupSpec('node_name', data_devices=devs)
     assert spec.data_devices.paths == ['/dev/sda']
 
-    with pytest.raises(TypeError, match='exclusive'):
+    with pytest.raises(DriveGroupValidationError, match='exclusive'):
         DeviceSelection(paths=['/dev/sda'], rotates=False)
 
index cbe32695eee4275dc36d86e0f34d9f4c4e21445e..be7fbd57c5ae0adca03919c9ca4fa61d3d4abeeb 100644 (file)
@@ -26,6 +26,10 @@ class TestReadCompletion(orchestrator.ReadCompletion):
         global all_completions
         all_completions.append(self)
 
+    def __str__(self):
+        return "TestReadCompletion(result={} message={})".format(self.result, self.message)
+
+
     @property
     def result(self):
         return self._result
@@ -40,10 +44,9 @@ class TestReadCompletion(orchestrator.ReadCompletion):
 
 
 class TestWriteCompletion(orchestrator.WriteCompletion):
-    def __init__(self, execute_cb, complete_cb, message):
+    def __init__(self, execute_cb, message):
         super(TestWriteCompletion, self).__init__()
         self.execute_cb = execute_cb
-        self.complete_cb = complete_cb
 
         # Executed means I executed my API call, it may or may
         # not have succeeded
@@ -63,6 +66,13 @@ class TestWriteCompletion(orchestrator.WriteCompletion):
         global all_completions
         all_completions.append(self)
 
+    def __str__(self):
+        return "TestWriteCompletion(executed={} result={} id={} message={} error={})".format(self.executed, self._result, self.id, self.message,  self.error)
+
+    @property
+    def result(self):
+        return self._result
+
     @property
     def is_persistent(self):
         return (not self.is_errored) and self.executed
@@ -79,13 +89,18 @@ class TestWriteCompletion(orchestrator.WriteCompletion):
         if not self.executed:
             self._result = self.execute_cb()
             self.executed = True
+            self.effective = True
 
-        if not self.effective:
-            # TODO: check self.result for API errors
-            if self.complete_cb is None:
-                self.effective = True
-            else:
-                self.effective = self.complete_cb()
+
+def deferred_write(message):
+    def wrapper(f):
+        @functools.wraps(f)
+        def inner(*args, **kwargs):
+            args[0].log.warning('message' + message)
+            return TestWriteCompletion(lambda: f(*args, **kwargs),
+                                       '{}, args={}, kwargs={}'.format(message, args, kwargs))
+        return inner
+    return wrapper
 
 
 def deferred_read(f):
@@ -246,12 +261,12 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
     def add_stateless_service(self, service_type, spec):
         raise NotImplementedError(service_type)
 
+    @deferred_write("create_osds")
     def create_osds(self, drive_group, all_hosts):
-        raise NotImplementedError(str(drive_group))
+        drive_group.validate(all_hosts)
+
 
+    @deferred_write("service_action")
     def service_action(self, action, service_type, service_name=None, service_id=None):
-        return TestWriteCompletion(
-            lambda: True, None,
-            "Pretending to {} service {} (name={}, id={})".format(
-                action, service_type, service_name, service_id))
+        pass