Please see the ceph-mgr module developer's guide for more information.
"""
-import six
-
-from mgr_util import format_bytes
+import sys
+import time
+import fnmatch
try:
from typing import TypeVar, Generic, List, Optional, Union, Tuple
except ImportError:
T, G = object, object
-import time
-import fnmatch
+import six
+
+from mgr_util import format_bytes
+
+
+class OrchestratorError(Exception):
+ """
+ General orchestrator specific error.
+
+ Used for deployment, configuration or user errors.
+
+ It's not intended for programming errors or orchestrator internal errors.
+ """
+
+
+class NoOrchestrator(OrchestratorError):
+ """
+ No orchestrator in configured.
+ """
+ def __init__(self, msg="No orchestrator configured (try `ceph orchestrator set backend`)"):
+ super(NoOrchestrator, self).__init__(msg)
+
+
+class OrchestratorValidationError(OrchestratorError):
+ """
+ Raised when an orchestrator doesn't support a specific feature.
+ """
-class NoOrchestrator(Exception):
- def __init__(self):
- super(NoOrchestrator, self).__init__("No orchestrator configured (try "
- "`ceph orchestrator set backend`)")
class _Completion(G):
@property
"""
raise NotImplementedError()
+ @property
+ def exception(self):
+ # type: () -> Optional[Exception]
+ """
+ Holds an exception object.
+ """
+ try:
+ return self.__exception
+ except AttributeError:
+ return None
+
+ @exception.setter
+ def exception(self, value):
+ self.__exception = value
+
@property
def is_read(self):
# type: () -> bool
@property
def is_errored(self):
# type: () -> bool
- raise NotImplementedError()
+ """
+ Has the completion failed. Default implementation looks for
+ self.exception. Can be overwritten.
+ """
+ return self.exception is not None
@property
def should_wait(self):
raise NotImplementedError()
+def raise_if_exception(c):
+ # type: (_Completion) -> None
+ """
+ :raises OrchestratorError: Some user error or a config error.
+ :raises Exception: Some internal error
+ """
+ def copy_to_this_subinterpreter(r_obj):
+ # This is something like `return pickle.loads(pickle.dumps(r_obj))`
+ # Without importing anything.
+ r_cls = r_obj.__class__
+ if r_cls.__module__ == '__builtin__':
+ return r_obj
+ my_cls = getattr(sys.modules[r_cls.__module__], r_cls.__name__)
+ if id(my_cls) == id(r_cls):
+ return r_obj
+ my_obj = my_cls.__new__(my_cls)
+ for k,v in r_obj.__dict__.items():
+ setattr(my_obj, k, copy_to_this_subinterpreter(v))
+ return my_obj
+
+ if c.exception is not None:
+ raise copy_to_this_subinterpreter(c.exception)
+
+
class ReadCompletion(_Completion):
"""
``Orchestrator`` implementations should inherit from this
@_mk_orch_methods
class OrchestratorClientMixin(Orchestrator):
+ """
+ A module that inherents from `OrchestratorClientMixin` can directly call
+ all :class:`Orchestrator` methods without manually calling remote.
+
+ Every interface method from ``Orchestrator`` is converted into a stub method that internally
+ calls :func:`OrchestratorClientMixin._oremote`
+
+ >>> class MyModule(OrchestratorClientMixin):
+ ... def func(self):
+ ... completion = self.add_host('somehost') # calls `_oremote()`
+ ... self._orchestrator_wait([completion])
+ ... self.log.debug(completion.result)
+
+ """
def _oremote(self, meth, args, kwargs):
"""
Helper for invoking `remote` on whichever orchestrator is enabled
+
+ :raises RuntimeError: If the remote method failed.
+ :raises NoOrchestrator:
+ :raises ImportError: no `orchestrator_cli` module or backend not found.
"""
try:
o = self._select_orchestrator()
def _orchestrator_wait(self, completions):
# type: (List[_Completion]) -> None
"""
- Helper to wait for completions to complete (reads) or
+ Wait for completions to complete (reads) or
become persistent (writes).
Waits for writes to be *persistent* but not *effective*.
+
+ :param completions: List of Completions
+ :raises NoOrchestrator:
+ :raises ImportError: no `orchestrator_cli` module or backend not found.
"""
while not self.wait(completions):
if any(c.should_wait for c in completions):
time.sleep(5)
else:
break
-
- if all(hasattr(c, 'error') and getattr(c, 'error') for c in completions):
- raise Exception([getattr(c, 'error') for c in completions])
from functools import wraps
-from mgr_module import MgrModule, HandleCommandResult, CLIWriteCommand, CLIReadCommand
+from mgr_module import MgrModule, HandleCommandResult, CLICommand
import orchestrator
-def handle_exceptions(func):
-
+def handle_exception(prefix, cmd_args, desc, perm, func):
@wraps(func)
- def inner(*args, **kwargs):
+ def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
- except (orchestrator.NoOrchestrator, ImportError) as e:
+ except (orchestrator.OrchestratorError, ImportError) as e:
+ # Do not print Traceback for expected errors.
return HandleCommandResult(-errno.ENOENT, stderr=str(e))
- return inner
+ except NotImplementedError:
+ msg = 'This Orchestrator does not support `{}`'.format(prefix)
+ return HandleCommandResult(-errno.ENOENT, stderr=msg)
+
+ return CLICommand(prefix, cmd_args, desc, perm)(wrapper)
+
+
+def _cli_command(perm):
+ def inner_cli_command(prefix, cmd_args="", desc=""):
+ return lambda func: handle_exception(prefix, cmd_args, desc, perm, func)
+ return inner_cli_command
+
+_read_cli = _cli_command('r')
+_write_cli = _cli_command('rw')
class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
MODULE_OPTIONS = [
def _select_orchestrator(self):
return self.get_module_option("orchestrator")
- @CLIWriteCommand('orchestrator host add',
- "name=host,type=CephString,req=true",
- 'Add a host')
- @handle_exceptions
+ @_write_cli('orchestrator host add',
+ "name=host,type=CephString,req=true",
+ 'Add a host')
def _add_host(self, host):
completion = self.add_host(host)
self._orchestrator_wait([completion])
+ orchestrator.raise_if_exception(completion)
return HandleCommandResult(stdout=str(completion.result))
- @CLIWriteCommand('orchestrator host rm',
- "name=host,type=CephString,req=true",
- 'Remove a host')
- @handle_exceptions
+ @_write_cli('orchestrator host rm',
+ "name=host,type=CephString,req=true",
+ 'Remove a host')
def _remove_host(self, host):
completion = self.remove_host(host)
self._orchestrator_wait([completion])
+ orchestrator.raise_if_exception(completion)
return HandleCommandResult(stdout=str(completion.result))
- @CLIReadCommand('orchestrator host ls',
- desc='List hosts')
- @handle_exceptions
+ @_read_cli('orchestrator host ls',
+ desc='List hosts')
def _get_hosts(self):
completion = self.get_hosts()
self._orchestrator_wait([completion])
+ orchestrator.raise_if_exception(completion)
result = "\n".join(map(lambda node: node.name, completion.result))
return HandleCommandResult(stdout=result)
- @CLIReadCommand('orchestrator device ls',
- "name=host,type=CephString,n=N,req=false "
- "name=format,type=CephChoices,strings=json|plain,req=false "
- "name=refresh,type=CephBool,req=false",
- 'List devices on a node')
- @handle_exceptions
+ @_read_cli('orchestrator device ls',
+ "name=host,type=CephString,n=N,req=false "
+ "name=format,type=CephChoices,strings=json|plain,req=false "
+ "name=refresh,type=CephBool,req=false",
+ 'List devices on a node')
def _list_devices(self, host=None, format='plain', refresh=False):
# type: (List[str], str, bool) -> HandleCommandResult
"""
completion = self.get_inventory(node_filter=nf, refresh=refresh)
self._orchestrator_wait([completion])
+ orchestrator.raise_if_exception(completion)
if format == 'json':
data = [n.to_json() for n in completion.result]
return HandleCommandResult(stdout=result)
- @CLIReadCommand('orchestrator service ls',
- "name=host,type=CephString,req=false "
- "name=svc_type,type=CephChoices,strings=mon|mgr|osd|mds|nfs|rgw|rbd-mirror,req=false "
- "name=svc_id,type=CephString,req=false "
- "name=format,type=CephChoices,strings=json|plain,req=false",
- 'List services known to orchestrator')
- @handle_exceptions
+ @_read_cli('orchestrator service ls',
+ "name=host,type=CephString,req=false "
+ "name=svc_type,type=CephChoices,strings=mon|mgr|osd|mds|nfs|rgw|rbd-mirror,req=false "
+ "name=svc_id,type=CephString,req=false "
+ "name=format,type=CephChoices,strings=json|plain,req=false",
+ 'List services known to orchestrator')
def _list_services(self, host=None, svc_type=None, svc_id=None, format='plain'):
# XXX this is kind of confusing for people because in the orchestrator
# context the service ID for MDS is the filesystem ID, not the daemon ID
completion = self.describe_service(svc_type, svc_id, host)
self._orchestrator_wait([completion])
+ orchestrator.raise_if_exception(completion)
services = completion.result
# Sort the list for display
return HandleCommandResult(stdout="\n".join(lines))
- @CLIWriteCommand('orchestrator osd create',
- "name=svc_arg,type=CephString,req=false",
- 'Create an OSD service. Either --svc_arg=host:drives or -i <drive_group>')
- @handle_exceptions
+ @_write_cli('orchestrator osd create',
+ "name=svc_arg,type=CephString,req=false",
+ 'Create an OSD service. Either --svc_arg=host:drives or -i <drive_group>')
def _create_osd(self, svc_arg=None, inbuf=None):
# type: (str, str) -> HandleCommandResult
"""Create one or more OSDs"""
# Like a future or so.
host_completion = self.get_hosts()
self._orchestrator_wait([host_completion])
+ orchestrator.raise_if_exception(host_completion)
all_hosts = [h.name for h in host_completion.result]
try:
completion = self.create_osds(drive_group, all_hosts)
self._orchestrator_wait([completion])
+ orchestrator.raise_if_exception(completion)
self.log.warning(str(completion.result))
return HandleCommandResult(stdout=str(completion.result))
- @CLIWriteCommand('orchestrator osd rm',
- "name=svc_id,type=CephString,n=N",
- 'Remove OSD services')
- @handle_exceptions
+ @_write_cli('orchestrator osd rm',
+ "name=svc_id,type=CephString,n=N",
+ 'Remove OSD services')
def _osd_rm(self, svc_id):
# type: (List[str]) -> HandleCommandResult
"""
"""
completion = self.remove_osds(svc_id)
self._orchestrator_wait([completion])
+ orchestrator.raise_if_exception(completion)
return HandleCommandResult(stdout=str(completion.result))
def _add_stateless_svc(self, svc_type, spec):
completion = self.add_stateless_service(svc_type, spec)
self._orchestrator_wait([completion])
+ orchestrator.raise_if_exception(completion)
return HandleCommandResult()
- @CLIWriteCommand('orchestrator mds add',
- "name=svc_arg,type=CephString",
- 'Create an MDS service')
- @handle_exceptions
+ @_write_cli('orchestrator mds add',
+ "name=svc_arg,type=CephString",
+ 'Create an MDS service')
def _mds_add(self, svc_arg):
spec = orchestrator.StatelessServiceSpec()
spec.name = svc_arg
return self._add_stateless_svc("mds", spec)
- @CLIWriteCommand('orchestrator rgw add',
- "name=svc_arg,type=CephString",
- 'Create an RGW service')
- @handle_exceptions
+ @_write_cli('orchestrator rgw add',
+ "name=svc_arg,type=CephString",
+ 'Create an RGW service')
def _rgw_add(self, svc_arg):
spec = orchestrator.StatelessServiceSpec()
spec.name = svc_arg
return self._add_stateless_svc("rgw", spec)
- @CLIWriteCommand('orchestrator nfs add',
- "name=svc_arg,type=CephString "
- "name=pool,type=CephString "
- "name=namespace,type=CephString,req=false",
- 'Create an NFS service')
- @handle_exceptions
+ @_write_cli('orchestrator nfs add',
+ "name=svc_arg,type=CephString "
+ "name=pool,type=CephString "
+ "name=namespace,type=CephString,req=false",
+ 'Create an NFS service')
def _nfs_add(self, svc_arg, pool, namespace=None):
spec = orchestrator.StatelessServiceSpec()
spec.name = svc_arg
def _rm_stateless_svc(self, svc_type, svc_id):
completion = self.remove_stateless_service(svc_type, svc_id)
self._orchestrator_wait([completion])
+ orchestrator.raise_if_exception(completion)
return HandleCommandResult()
- @CLIWriteCommand('orchestrator mds rm',
- "name=svc_id,type=CephString",
- 'Remove an MDS service')
+ @_write_cli('orchestrator mds rm',
+ "name=svc_id,type=CephString",
+ 'Remove an MDS service')
def _mds_rm(self, svc_id):
return self._rm_stateless_svc("mds", svc_id)
- @handle_exceptions
- @CLIWriteCommand('orchestrator rgw rm',
- "name=svc_id,type=CephString",
- 'Remove an RGW service')
+ @_write_cli('orchestrator rgw rm',
+ "name=svc_id,type=CephString",
+ 'Remove an RGW service')
def _rgw_rm(self, svc_id):
return self._rm_stateless_svc("rgw", svc_id)
- @CLIWriteCommand('orchestrator nfs rm',
- "name=svc_id,type=CephString",
- 'Remove an NFS service')
- @handle_exceptions
+ @_write_cli('orchestrator nfs rm',
+ "name=svc_id,type=CephString",
+ 'Remove an NFS service')
def _nfs_rm(self, svc_id):
return self._rm_stateless_svc("nfs", svc_id)
- @CLIWriteCommand('orchestrator nfs update',
- "name=svc_id,type=CephString "
- "name=num,type=CephInt",
- 'Scale an NFS service')
- @handle_exceptions
+ @_write_cli('orchestrator nfs update',
+ "name=svc_id,type=CephString "
+ "name=num,type=CephInt",
+ 'Scale an NFS service')
def _nfs_update(self, svc_id, num):
spec = orchestrator.StatelessServiceSpec()
spec.name = svc_id
self._orchestrator_wait([completion])
return HandleCommandResult()
- @CLIWriteCommand('orchestrator service',
- "name=action,type=CephChoices,strings=start|stop|reload "
- "name=svc_type,type=CephString "
- "name=svc_name,type=CephString",
- 'Start, stop or reload an entire service (i.e. all daemons)')
- @handle_exceptions
+ @_write_cli('orchestrator service',
+ "name=action,type=CephChoices,strings=start|stop|reload "
+ "name=svc_type,type=CephString "
+ "name=svc_name,type=CephString",
+ 'Start, stop or reload an entire service (i.e. all daemons)')
def _service_action(self, action, svc_type, svc_name):
completion = self.service_action(action, svc_type, service_name=svc_name)
self._orchestrator_wait([completion])
+ orchestrator.raise_if_exception(completion)
return HandleCommandResult()
- @CLIWriteCommand('orchestrator service-instance',
- "name=action,type=CephChoices,strings=start|stop|reload "
- "name=svc_type,type=CephString "
- "name=svc_id,type=CephString",
- 'Start, stop or reload a specific service instance')
- @handle_exceptions
+ @_write_cli('orchestrator service-instance',
+ "name=action,type=CephChoices,strings=start|stop|reload "
+ "name=svc_type,type=CephString "
+ "name=svc_id,type=CephString",
+ 'Start, stop or reload a specific service instance')
def _service_instance_action(self, action, svc_type, svc_id):
completion = self.service_action(action, svc_type, service_id=svc_id)
self._orchestrator_wait([completion])
+ orchestrator.raise_if_exception(completion)
return HandleCommandResult()
- @CLIWriteCommand('orchestrator mgr update',
- "name=num,type=CephInt,req=true "
- "name=hosts,type=CephString,n=N,req=false",
- 'Update the number of manager instances')
- @handle_exceptions
+ @_write_cli('orchestrator mgr update',
+ "name=num,type=CephInt,req=true "
+ "name=hosts,type=CephString,n=N,req=false",
+ 'Update the number of manager instances')
def _update_mgrs(self, num, hosts=None):
hosts = hosts if hosts is not None else []
completion = self.update_mgrs(num, hosts)
self._orchestrator_wait([completion])
+ orchestrator.raise_if_exception(completion)
return HandleCommandResult(stdout=str(completion.result))
- @CLIWriteCommand('orchestrator mon update',
- "name=num,type=CephInt,req=true "
- "name=hosts,type=CephString,n=N,req=false",
- 'Update the number of monitor instances')
- @handle_exceptions
+ @_write_cli('orchestrator mon update',
+ "name=num,type=CephInt,req=true "
+ "name=hosts,type=CephString,n=N,req=false",
+ 'Update the number of monitor instances')
def _update_mons(self, num, hosts=None):
hosts = hosts if hosts is not None else []
completion = self.update_mons(num, hosts)
self._orchestrator_wait([completion])
+ orchestrator.raise_if_exception(completion)
return HandleCommandResult(stdout=str(completion.result))
- @CLIWriteCommand('orchestrator set backend',
- "name=module_name,type=CephString,req=true",
- 'Select orchestrator module backend')
- @handle_exceptions
+ @_write_cli('orchestrator set backend',
+ "name=module_name,type=CephString,req=true",
+ 'Select orchestrator module backend')
def _set_backend(self, module_name):
"""
We implement a setter command instead of just having the user
return HandleCommandResult(-errno.EINVAL, stderr="Module '{0}' not found".format(module_name))
- @CLIReadCommand('orchestrator status',
- desc='Report configured backend and its status')
- @handle_exceptions
+ @_read_cli('orchestrator status',
+ desc='Report configured backend and its status')
def _status(self):
o = self._select_orchestrator()
if o is None:
import threading
import functools
import uuid
-from subprocess import check_output
+from subprocess import check_output, CalledProcessError
from mgr_module import MgrModule
import orchestrator
-all_completions = []
-class TestReadCompletion(orchestrator.ReadCompletion):
+class TestCompletionMixin(object):
+ all_completions = [] # Hacky global
- def __init__(self, cb):
- super(TestReadCompletion, self).__init__()
+ def __init__(self, cb, message, *args, **kwargs):
+ super(TestCompletionMixin, self).__init__(*args, **kwargs)
self.cb = cb
self._result = None
self._complete = False
- self.message = "<read op>"
-
- global all_completions
- all_completions.append(self)
-
- def __str__(self):
- return "TestReadCompletion(result={} message={})".format(self.result, self.message)
+ self.message = message
+ TestCompletionMixin.all_completions.append(self)
@property
def result(self):
def execute(self):
self._result = self.cb()
+ self.executed = True
self._complete = True
+ def __str__(self):
+ return "{}(result={} message={}, exception={})".format(self.__class__.__name__, self.result,
+ self.message, self.exception)
-class TestWriteCompletion(orchestrator.WriteCompletion):
- def __init__(self, execute_cb, message):
- super(TestWriteCompletion, self).__init__()
- self.execute_cb = execute_cb
-
- # Executed means I executed my API call, it may or may
- # not have succeeded
- self.executed = False
- self._result = None
+class TestReadCompletion(TestCompletionMixin, orchestrator.ReadCompletion):
+ def __init__(self, cb):
+ super(TestReadCompletion, self).__init__(cb, "<read op>")
- self.effective = False
+class TestWriteCompletion(TestCompletionMixin, orchestrator.WriteCompletion):
+ def __init__(self, cb, message):
+ super(TestWriteCompletion, self).__init__(cb, message)
self.id = str(uuid.uuid4())
- self.message = message
-
- self.error = None
-
- # XXX hacky global
- global all_completions
- all_completions.append(self)
-
- def __str__(self):
- return "TestWriteCompletion(executed={} result={} id={} message={} error={})".format(self.executed, self._result, self.id, self.message, self.error)
-
- @property
- def result(self):
- return self._result
-
@property
def is_persistent(self):
return (not self.is_errored) and self.executed
@property
def is_effective(self):
- return self.effective
-
- @property
- def is_errored(self):
- return self.error is not None
-
- def execute(self):
- if not self.executed:
- self._result = self.execute_cb()
- self.executed = True
- self.effective = True
+ return self._complete
def deferred_write(message):
self.log.exception("Completion {0} threw an exception:".format(
c.message
))
- c.error = e
+ c.exception = e
c._complete = True
if not c.is_read:
self._progress("complete", c.id)
# in case we had a caller that wait()'ed on them long enough
# to get persistence but not long enough to get completion
- global all_completions
- self.wait(all_completions)
- all_completions = [c for c in all_completions if not c.is_complete]
+ self.wait(TestCompletionMixin.all_completions)
+ TestCompletionMixin.all_completions = [c for c in TestCompletionMixin.all_completions if
+ not c.is_complete]
self._shutdown.wait(5)
cmd = """
. {tmpdir}/ceph-volume-virtualenv/bin/activate
ceph-volume inventory --format json
- """.format(tmpdir=os.environ.get('TMPDIR', '/tmp'))
- c_v_out = check_output(cmd, shell=True)
+ """
+ try:
+ c_v_out = check_output(cmd.format(tmpdir=os.environ.get('TMPDIR', '/tmp')), shell=True)
+ except (OSError, CalledProcessError):
+ c_v_out = check_output(cmd.format(tmpdir='.'),shell=True)
for out in c_v_out.splitlines():
if not out.startswith(b'-->') and not out.startswith(b' stderr'):
@deferred_write("add_host")
def add_host(self, host):
+ if host == 'raise_no_support':
+ raise orchestrator.OrchestratorValidationError("MON count must be either 1, 3 or 5")
+ if host == 'raise_bug':
+ raise ZeroDivisionError()
+ if host == 'raise_not_implemented':
+ raise NotImplementedError()
+ if host == 'raise_no_orchestrator':
+ raise orchestrator.NoOrchestrator()
+ if host == 'raise_import_error':
+ raise ImportError("test_orchestrator not enabled")
assert isinstance(host, str)
@deferred_write("remove_host")