+import json
import logging
+from tempfile import NamedTemporaryFile
+
+from teuthology.exceptions import CommandFailedError
from mgr_test_case import MgrTestCase
MGRS_REQUIRED = 1
def _orch_cmd(self, *args):
- retstr = self.mgr_cluster.mon_manager.raw_cluster_cmd("orchestrator", *args)
- return retstr
+ return self.mgr_cluster.mon_manager.raw_cluster_cmd("orchestrator", *args)
+
+ def _orch_cmd_result(self, *args, **kwargs):
+ """
+ superfluous, but raw_cluster_cmd doesn't support kwargs.
+ """
+ res = self.mgr_cluster.mon_manager.raw_cluster_cmd_result("orchestrator", *args, **kwargs)
+ self.assertEqual(res, 0)
+
def setUp(self):
super(TestOrchestratorCli, self).setUp()
self._orch_cmd("service-instance", "reload", "mds", "a")
self._orch_cmd("service-instance", "stop", "mds", "a")
self._orch_cmd("service-instance", "start", "mds", "a")
+
+ def test_osd_create(self):
+ self._orch_cmd("osd", "create", "*:device")
+ self._orch_cmd("osd", "create", "*:device,device2")
+
+ drive_group = {
+ "host_pattern": "*",
+ "data_devices": {"paths": ["/dev/sda"]}
+ }
+
+ self._orch_cmd_result("osd", "create", "-i", "-", stdin=json.dumps(drive_group))
+
+ with self.assertRaises(CommandFailedError):
+ self._orch_cmd("osd", "create", "notfound:device")
+
return ansible_operation
- def create_osds(self, drive_group, all_hosts=None):
+ def create_osds(self, drive_group, all_hosts):
"""Create one or more OSDs within a single Drive Group.
If no host provided the operation affects all the host in the OSDS role
Invoke a method on another module. All arguments, and the return
value from the other module must be serializable.
+ Limitation: Do not import any modules within the called method.
+ Otherwise you will get an error in Python 2::
+
+ RuntimeError('cannot unmarshal code objects in restricted execution mode',)
+
+
+
:param module_name: Name of other module. If module isn't loaded,
an ImportError exception is raised.
:param method_name: Method name. If it does not exist, a NameError
Please see the ceph-mgr module developer's guide for more information.
"""
+import six
+
try:
from typing import TypeVar, Generic, List, Optional, Union
T = TypeVar('T')
T, G = object, object
import time
+import fnmatch
class _Completion(G):
assert not (service_name and service_id)
raise NotImplementedError()
- def create_osds(self, drive_group, all_hosts=None):
+ def create_osds(self, drive_group, all_hosts):
# type: (DriveGroupSpec, List[str]) -> WriteCompletion
"""
Create one or more OSDs within a single Drive Group.
#: List of absolute paths to the devices.
self.paths = paths # type: List[str]
- if self.paths and any(p is not None for p in [id_model, size, rotates, count]):
- raise TypeError('`paths` and other parameters are mutually exclusive')
#: A wildcard string. e.g: "SDD*"
self.id_model = id_model
#: if this is present limit the number of drives to this number.
self.count = count
+ self.validate()
+
+ def validate(self):
+ props = [self.id_model, self.size, self.rotates, self.count]
+ if self.paths and any(p is not None for p in props):
+ raise DriveGroupValidationError('DeviceSelection: `paths` and other parameters are mutually exclusive')
+ if not any(p is not None for p in [self.paths] + props):
+ raise DriveGroupValidationError('DeviceSelection cannot be empty')
@classmethod
def from_json(cls, device_spec):
return cls(**device_spec)
+class DriveGroupValidationError(Exception):
+ """
+ Defining an exception here is a bit problematic, cause you cannot properly catch it,
+ if it was raised in a different mgr module.
+ """
+
+ def __init__(self, msg):
+ super(DriveGroupValidationError, self).__init__('Failed to validate Drive Group: ' + msg)
+
class DriveGroupSpec(object):
"""
Describe a drive group in the same form that ceph-volume
#: To fully utilize nvme devices multiple osds are required.
self.osds_per_device = osds_per_device
- assert objectstore in ('filestore', 'bluestore')
#: ``filestore`` or ``bluestore``
self.objectstore = objectstore
@classmethod
def from_json(self, json_drive_group):
"""
- Initialize and verify 'Drive group' structure
+ Initialize 'Drive group' structure
:param json_drive_group: A valid json string with a Drive Group
specification
return DriveGroupSpec(**args)
def hosts(self, all_hosts):
- import fnmatch
return fnmatch.filter(all_hosts, self.host_pattern)
+ def validate(self, all_hosts):
+ if not isinstance(self.host_pattern, six.string_types):
+ raise DriveGroupValidationError('host_pattern must be of type string')
+
+ specs = [self.data_devices, self.db_devices, self.wal_devices, self.journal_devices]
+ for s in filter(None, specs):
+ s.validate()
+ if self.objectstore not in ('filestore', 'bluestore'):
+ raise DriveGroupValidationError("objectstore not in ('filestore', 'bluestore')")
+ if not self.hosts(all_hosts):
+ raise DriveGroupValidationError(
+ "host_pattern '{}' does not match any hosts".format(self.host_pattern))
+
class StatelessServiceSpec(object):
# Request to orchestrator for a group of stateless services
return inner
for meth in Orchestrator.__dict__:
- if not meth.startswith('_') and meth not in ['is_orchestrator_module', 'available']:
+ if not meth.startswith('_') and meth not in ['is_orchestrator_module']:
setattr(cls, meth, shim(meth))
return cls
o = self._select_orchestrator()
except AttributeError:
o = self.remote('orchestrator_cli', '_select_orchestrator')
+ self.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(self.module_name, o, meth, args, kwargs))
return self.remote(o, meth, *args, **kwargs)
def _orchestrator_wait(self, completions):
import errno
import json
+
+try:
+ from typing import Dict
+except ImportError:
+ pass # just for type checking.
+
from mgr_module import MgrModule, HandleCommandResult
import orchestrator
"perm": "r"
},
{
- 'cmd': "orchestrator osd add "
- "name=svc_arg,type=CephString ",
- "desc": "Create an OSD service",
+ 'cmd': "orchestrator osd create "
+ "name=svc_arg,type=CephString,req=false ",
+ "desc": "Create an OSD service. Either --svc_arg=host:drives or -i <drive_group>",
"perm": "rw"
},
{
'cmd': "orchestrator osd rm "
- "name=svc_id,type=CephString ",
+ "name=svc_id,type=CephString,n=N ",
"desc": "Remove an OSD service",
"perm": "rw"
},
"cmd": "orchestrator status",
"desc": "Report configured backend and its status",
"perm": "r"
- },
- { 'cmd': "orchestrator osd create "
- "name=drive_group,type=CephString,req=false ",
- "desc": "OSD's creation following specification \
- in <drive_group> parameter or readed from -i <file> input.",
- "perm": "rw"
- },
- { 'cmd': "orchestrator osd remove "
- "name=osd_ids,type=CephInt,n=N ",
- "desc": "Remove Osd's",
- "perm": "rw"
}
]
return HandleCommandResult(stdout="\n".join(lines))
- def _osd_add(self, cmd):
- device_spec = cmd['svc_arg']
- try:
- node_name, block_device = device_spec.split(":")
- except TypeError:
- return HandleCommandResult(-errno.EINVAL,
- stderr="Invalid device spec, should be <node>:<device>")
-
- devs = orchestrator.DeviceSelection(paths=block_device)
- spec = orchestrator.DriveGroupSpec(node_name, data_devices=devs)
-
- # TODO: Remove this and make the orchestrator composable
- # or
- # Probably this should be moved to each of the orchestrators,
- # then we wouldn't need the "all_hosts" parameter at all.
- host_completion = self.get_hosts()
- self.wait([host_completion])
- all_hosts = [h.name for h in host_completion.result]
-
- completion = self.create_osds(spec, all_hosts=all_hosts)
- self._orchestrator_wait([completion])
+ def _create_osd(self, inbuf, cmd):
+ # type: (str, Dict[str, str]) -> HandleCommandResult
+ """Create one or more OSDs"""
- return HandleCommandResult()
+ usage = """
+Usage:
+ ceph orchestrator osd create -i <json_file>
+ ceph orchestrator osd create host:device1,device2,...
+"""
- def _create_osd(self, inbuf, cmd):
- """Create one or more OSDs
+ if inbuf:
+ try:
+ drive_group = orchestrator.DriveGroupSpec.from_json(json.loads(inbuf))
+ except ValueError as e:
+ msg = 'Failed to read JSON input: {}'.format(str(e)) + usage
+ return HandleCommandResult(-errno.EINVAL, stderr=msg)
- :cmd : Arguments for the create osd
- """
- #Obtain/validate parameters for the operation
- cmdline_error = ""
- if "drive_group" in cmd.keys():
- params = cmd["drive_group"]
- elif inbuf:
- params = inbuf
else:
- cmdline_error = "Please, use 'drive_group' parameter \
- or specify -i <input file>"
+ try:
+ node_name, block_device = cmd['svc_arg'].split(":")
+ block_devices = block_device.split(',')
+ except (TypeError, KeyError, ValueError):
+ msg = "Invalid host:device spec: '{}'".format(cmd['svc_arg']) + usage
+ return HandleCommandResult(-errno.EINVAL, stderr=msg)
- if cmdline_error:
- return HandleCommandResult(-errno.EINVAL, stderr=cmdline_error)
+ devs = orchestrator.DeviceSelection(paths=block_devices)
+ drive_group = orchestrator.DriveGroupSpec(node_name, data_devices=devs)
- try:
- json_dg = json.loads(params)
- except ValueError as msg:
- return HandleCommandResult(-errno.EINVAL, stderr=msg)
-
- # Create the drive group
- drive_group = orchestrator.DriveGroupSpec.from_json(json_dg)
- #Read other Drive_group
+ # TODO: Remove this and make the orchestrator composable
+ # Like a future or so.
+ host_completion = self.get_hosts()
+ self._orchestrator_wait([host_completion])
+ all_hosts = [h.name for h in host_completion.result]
- #Launch the operation in the orchestrator module
- completion = self.create_osds(drive_group)
+ try:
+ drive_group.validate(all_hosts)
+ except orchestrator.DriveGroupValidationError as e:
+ return HandleCommandResult(-errno.EINVAL, stderr=str(e))
- #Wait until the operation finishes
+ completion = self.create_osds(drive_group, all_hosts)
self._orchestrator_wait([completion])
+ self.log.warning(str(completion.result))
+ return HandleCommandResult(stdout=str(completion.result))
- #return result
- return HandleCommandResult(stdout=completion.result)
-
- def _remove_osd(self, cmd):
+ def _osd_rm(self, cmd):
"""
Remove OSD's
:cmd : Arguments for remove the osd
"""
-
- #Launch the operation in the orchestrator module
- completion = self.remove_osds(cmd["osd_ids"])
-
- #Wait until the operation finishes
+ completion = self.remove_osds(cmd["svc_id"])
self._orchestrator_wait([completion])
-
- #return result
- return HandleCommandResult(stdout=completion.result)
+ return HandleCommandResult(stdout=str(completion.result))
def _add_stateless_svc(self, svc_type, spec):
completion = self.add_stateless_service(svc_type, spec)
self._orchestrator_wait([completion])
return HandleCommandResult()
- def _osd_rm(self, cmd):
- return self._rm_stateless_svc("osd", cmd['svc_id'])
-
def _mds_rm(self, cmd):
return self._rm_stateless_svc("mds", cmd['svc_id'])
except NotImplementedError:
return HandleCommandResult(-errno.EINVAL, stderr="Command not found")
- def _handle_command(self, _, cmd):
+ def _handle_command(self, inbuf, cmd):
if cmd['prefix'] == "orchestrator device ls":
return self._list_devices(cmd)
elif cmd['prefix'] == "orchestrator service ls":
return self._list_services(cmd)
elif cmd['prefix'] == "orchestrator service status":
return self._list_services(cmd) # TODO: create more detailed output
- elif cmd['prefix'] == "orchestrator osd add":
- return self._osd_add(cmd)
elif cmd['prefix'] == "orchestrator osd rm":
return self._osd_rm(cmd)
elif cmd['prefix'] == "orchestrator mds add":
elif cmd['prefix'] == "orchestrator status":
return self._status()
elif cmd['prefix'] == "orchestrator osd create":
- return self._create_osd(_, cmd)
+ return self._create_osd(inbuf, cmd)
elif cmd['prefix'] == "orchestrator osd remove":
return self._remove_osd(cmd)
else:
import pytest
-from orchestrator import DriveGroupSpec, DeviceSelection
+from orchestrator import DriveGroupSpec, DeviceSelection, DriveGroupValidationError
def test_DriveGroup():
spec = DriveGroupSpec('node_name', data_devices=devs)
assert spec.data_devices.paths == ['/dev/sda']
- with pytest.raises(TypeError, match='exclusive'):
+ with pytest.raises(DriveGroupValidationError, match='exclusive'):
DeviceSelection(paths=['/dev/sda'], rotates=False)
global all_completions
all_completions.append(self)
+ def __str__(self):
+ return "TestReadCompletion(result={} message={})".format(self.result, self.message)
+
+
@property
def result(self):
return self._result
class TestWriteCompletion(orchestrator.WriteCompletion):
- def __init__(self, execute_cb, complete_cb, message):
+ def __init__(self, execute_cb, message):
super(TestWriteCompletion, self).__init__()
self.execute_cb = execute_cb
- self.complete_cb = complete_cb
# Executed means I executed my API call, it may or may
# not have succeeded
global all_completions
all_completions.append(self)
+ def __str__(self):
+ return "TestWriteCompletion(executed={} result={} id={} message={} error={})".format(self.executed, self._result, self.id, self.message, self.error)
+
+ @property
+ def result(self):
+ return self._result
+
@property
def is_persistent(self):
return (not self.is_errored) and self.executed
if not self.executed:
self._result = self.execute_cb()
self.executed = True
+ self.effective = True
- if not self.effective:
- # TODO: check self.result for API errors
- if self.complete_cb is None:
- self.effective = True
- else:
- self.effective = self.complete_cb()
+
+def deferred_write(message):
+ def wrapper(f):
+ @functools.wraps(f)
+ def inner(*args, **kwargs):
+ args[0].log.warning('message' + message)
+ return TestWriteCompletion(lambda: f(*args, **kwargs),
+ '{}, args={}, kwargs={}'.format(message, args, kwargs))
+ return inner
+ return wrapper
def deferred_read(f):
def add_stateless_service(self, service_type, spec):
raise NotImplementedError(service_type)
+ @deferred_write("create_osds")
def create_osds(self, drive_group, all_hosts):
- raise NotImplementedError(str(drive_group))
+ drive_group.validate(all_hosts)
+
+ @deferred_write("service_action")
def service_action(self, action, service_type, service_name=None, service_id=None):
- return TestWriteCompletion(
- lambda: True, None,
- "Pretending to {} service {} (name={}, id={})".format(
- action, service_type, service_name, service_id))
+ pass