except ImportError:
pass # just for type checking.
-from mgr_module import MgrModule, HandleCommandResult
+from functools import wraps
+from typing import List
+
+from mgr_module import MgrModule, HandleCommandResult, CLIWriteCommand, CLIReadCommand
import orchestrator
class NoOrchestrator(Exception):
- pass
+ def __init__(self):
+ super(NoOrchestrator, self).__init__("No orchestrator configured (try "
+ "`ceph orchestrator set backend`)")
+
+
+def handle_exceptions(func):
+
+ @wraps(func)
+ def inner(*args, **kwargs):
+ try:
+ return func(*args, **kwargs)
+ except (NoOrchestrator, ImportError) as e:
+ return HandleCommandResult(-errno.ENOENT, stderr=str(e))
+ return inner
class OrchestratorCli(orchestrator.OrchestratorClientMixin, MgrModule):
{'name': 'orchestrator'}
]
- COMMANDS = [
- {
- 'cmd': "orchestrator device ls "
- "name=host,type=CephString,req=false "
- "name=format,type=CephChoices,strings=json|plain,req=false ",
- "desc": "List devices on a node",
- "perm": "r"
- },
- {
- 'cmd': "orchestrator service ls "
- "name=host,type=CephString,req=false "
- "name=svc_type,type=CephString,req=false "
- "name=svc_id,type=CephString,req=false "
- "name=format,type=CephChoices,strings=json|plain,req=false ",
- "desc": "List services known to orchestrator" ,
- "perm": "r"
- },
- {
- 'cmd': "orchestrator service status "
- "name=host,type=CephString,req=false "
- "name=svc_type,type=CephString "
- "name=svc_id,type=CephString "
- "name=format,type=CephChoices,strings=json|plain,req=false ",
- "desc": "Get orchestrator state for Ceph service",
- "perm": "r"
- },
- {
- 'cmd': "orchestrator osd create "
- "name=svc_arg,type=CephString,req=false ",
- "desc": "Create an OSD service. Either --svc_arg=host:drives or -i <drive_group>",
- "perm": "rw"
- },
- {
- 'cmd': "orchestrator osd rm "
- "name=svc_id,type=CephString,n=N ",
- "desc": "Remove an OSD service",
- "perm": "rw"
- },
- {
- 'cmd': "orchestrator mds add "
- "name=svc_arg,type=CephString ",
- "desc": "Create an MDS service",
- "perm": "rw"
- },
- {
- 'cmd': "orchestrator mds rm "
- "name=svc_id,type=CephString ",
- "desc": "Remove an MDS service",
- "perm": "rw"
- },
- {
- 'cmd': "orchestrator rgw add "
- "name=svc_arg,type=CephString ",
- "desc": "Create an RGW service",
- "perm": "rw"
- },
- {
- 'cmd': "orchestrator rgw rm "
- "name=svc_id,type=CephString ",
- "desc": "Remove an RGW service",
- "perm": "rw"
- },
- {
- 'cmd': "orchestrator nfs add "
- "name=svc_arg,type=CephString "
- "name=pool,type=CephString "
- "name=namespace,type=CephString,req=false ",
- "desc": "Create an NFS service",
- "perm": "rw"
- },
- {
- 'cmd': "orchestrator nfs rm "
- "name=svc_id,type=CephString ",
- "desc": "Remove an NFS service",
- "perm": "rw"
- },
- {
- 'cmd': "orchestrator service "
- "name=action,type=CephChoices,"
- "strings=start|stop|reload "
- "name=svc_type,type=CephString "
- "name=svc_name,type=CephString",
- "desc": "Start, stop or reload an entire service (i.e. all daemons)",
- "perm": "rw"
- },
- {
- 'cmd': "orchestrator service-instance "
- "name=action,type=CephChoices,"
- "strings=start|stop|reload "
- "name=svc_type,type=CephString "
- "name=svc_id,type=CephString",
- "desc": "Start, stop or reload a specific service instance",
- "perm": "rw"
- },
- {
- 'cmd': "orchestrator set backend "
- "name=module,type=CephString,req=true",
- "desc": "Select orchestrator module backend",
- "perm": "rw"
- },
- {
- "cmd": "orchestrator status",
- "desc": "Report configured backend and its status",
- "perm": "r"
- }
- ]
-
def _select_orchestrator(self):
o = self.get_module_option("orchestrator")
if o is None:
return o
-
- def _list_devices(self, cmd):
+ @CLIReadCommand('orchestrator device ls',
+ "name=host,type=CephString,n=N,req=false "
+ "name=format,type=CephChoices,strings=json|plain,req=false",
+ 'List devices on a node')
+ @handle_exceptions
+ def _list_devices(self, host=None, format='plain'):
+ # type: (List[str], str) -> HandleCommandResult
"""
Provide information about storage devices present in cluster hosts
date hardware inventory is fine as long as hardware ultimately appears
in the output of this command.
"""
- host = cmd.get('host', None)
-
- if host:
- nf = orchestrator.InventoryFilter()
- nf.nodes = [host]
- else:
- nf = None
+ nf = orchestrator.InventoryFilter(nodes=host) if host else None
completion = self.get_inventory(node_filter=nf)
self._orchestrator_wait([completion])
- if cmd.get('format', 'plain') == 'json':
+ if format == 'json':
data = [n.to_json() for n in completion.result]
return HandleCommandResult(stdout=json.dumps(data))
else:
return HandleCommandResult(stdout=result)
- def _list_services(self, cmd):
- hostname = cmd.get('host', None)
- svc_id = cmd.get('svc_id', None)
- svc_type = cmd.get('svc_type', None)
+ @CLIReadCommand('orchestrator service ls',
+ "name=host,type=CephString,req=false "
+ "name=svc_type,type=CephChoices,strings=mon|mgr|osd|mds|nfs|rgw|rbd-mirror,req=false "
+ "name=svc_id,type=CephString,req=false "
+ "name=format,type=CephChoices,strings=json|plain,req=false",
+ 'List services known to orchestrator')
+ @handle_exceptions
+ def _list_services(self, host=None, svc_type=None, svc_id=None, format='plain'):
# XXX this is kind of confusing for people because in the orchestrator
# context the service ID for MDS is the filesystem ID, not the daemon ID
- completion = self.describe_service(svc_type, svc_id, hostname)
+ completion = self.describe_service(svc_type, svc_id, host)
self._orchestrator_wait([completion])
services = completion.result
if len(services) == 0:
return HandleCommandResult(stdout="No services reported")
- elif cmd.get('format', 'plain') == 'json':
+ elif format == 'json':
data = [s.to_json() for s in services]
return HandleCommandResult(stdout=json.dumps(data))
else:
return HandleCommandResult(stdout="\n".join(lines))
- def _create_osd(self, inbuf, cmd):
- # type: (str, Dict[str, str]) -> HandleCommandResult
+ @CLIWriteCommand('orchestrator osd create',
+ "name=svc_arg,type=CephString,req=false",
+ 'Create an OSD service. Either --svc_arg=host:drives or -i <drive_group>')
+ @handle_exceptions
+ def _create_osd(self, svc_arg=None, inbuf=None):
+ # type: (str, str) -> HandleCommandResult
"""Create one or more OSDs"""
usage = """
msg = 'Failed to read JSON input: {}'.format(str(e)) + usage
return HandleCommandResult(-errno.EINVAL, stderr=msg)
- else:
+ elif svc_arg:
try:
- node_name, block_device = cmd['svc_arg'].split(":")
+ node_name, block_device = svc_arg.split(":")
block_devices = block_device.split(',')
except (TypeError, KeyError, ValueError):
- msg = "Invalid host:device spec: '{}'".format(cmd['svc_arg']) + usage
+ msg = "Invalid host:device spec: '{}'".format(svc_arg) + usage
return HandleCommandResult(-errno.EINVAL, stderr=msg)
devs = orchestrator.DeviceSelection(paths=block_devices)
drive_group = orchestrator.DriveGroupSpec(node_name, data_devices=devs)
+ else:
+ return HandleCommandResult(-errno.EINVAL, stderr=usage)
# TODO: Remove this and make the orchestrator composable
# Like a future or so.
self.log.warning(str(completion.result))
return HandleCommandResult(stdout=str(completion.result))
- def _osd_rm(self, cmd):
+ @CLIWriteCommand('orchestrator osd rm',
+ "name=svc_id,type=CephString,n=N",
+ 'Remove OSD services')
+ @handle_exceptions
+ def _osd_rm(self, svc_id):
+ # type: (List[str]) -> HandleCommandResult
"""
Remove OSD's
:cmd : Arguments for remove the osd
"""
- completion = self.remove_osds(cmd["svc_id"])
+ completion = self.remove_osds(svc_id)
self._orchestrator_wait([completion])
return HandleCommandResult(stdout=str(completion.result))
self._orchestrator_wait([completion])
return HandleCommandResult()
- def _mds_add(self, cmd):
+ @CLIWriteCommand('orchestrator mds add',
+ "name=svc_arg,type=CephString",
+ 'Create an MDS service')
+ @handle_exceptions
+ def _mds_add(self, svc_arg):
spec = orchestrator.StatelessServiceSpec()
- spec.name = cmd['svc_arg']
+ spec.name = svc_arg
return self._add_stateless_svc("mds", spec)
- def _rgw_add(self, cmd):
+ @CLIWriteCommand('orchestrator rgw add',
+ "name=svc_arg,type=CephString",
+ 'Create an RGW service')
+ @handle_exceptions
+ def _rgw_add(self, svc_arg):
spec = orchestrator.StatelessServiceSpec()
- spec.name = cmd['svc_arg']
+ spec.name = svc_arg
return self._add_stateless_svc("rgw", spec)
- def _nfs_add(self, cmd):
- cluster_name = cmd['svc_arg']
- pool = cmd['pool']
- ns = cmd.get('namespace', None)
-
+ @CLIWriteCommand('orchestrator nfs add',
+ "name=svc_arg,type=CephString "
+ "name=pool,type=CephString "
+ "name=namespace,type=CephString,req=false",
+ 'Create an NFS service')
+ @handle_exceptions
+ def _nfs_add(self, svc_arg, pool, namespace=None):
spec = orchestrator.StatelessServiceSpec()
- spec.name = cluster_name
+ spec.name = svc_arg
spec.extended = { "pool":pool }
- if ns != None:
- spec.extended["namespace"] = ns
+ if namespace is not None:
+ spec.extended["namespace"] = namespace
return self._add_stateless_svc("nfs", spec)
def _rm_stateless_svc(self, svc_type, svc_id):
self._orchestrator_wait([completion])
return HandleCommandResult()
- def _mds_rm(self, cmd):
- return self._rm_stateless_svc("mds", cmd['svc_id'])
-
- def _rgw_rm(self, cmd):
- return self._rm_stateless_svc("rgw", cmd['svc_id'])
-
- def _nfs_rm(self, cmd):
- return self._rm_stateless_svc("nfs", cmd['svc_id'])
-
- def _service_action(self, cmd):
- action = cmd['action']
- svc_type = cmd['svc_type']
- svc_name = cmd['svc_name']
-
+ @CLIWriteCommand('orchestrator mds rm',
+ "name=svc_id,type=CephString",
+ 'Remove an MDS service')
+ def _mds_rm(self, svc_id):
+ return self._rm_stateless_svc("mds", svc_id)
+
+ @handle_exceptions
+ @CLIWriteCommand('orchestrator rgw rm',
+ "name=svc_id,type=CephString",
+ 'Remove an RGW service')
+ def _rgw_rm(self, svc_id):
+ return self._rm_stateless_svc("rgw", svc_id)
+
+ @CLIWriteCommand('orchestrator nfs rm',
+ "name=svc_id,type=CephString",
+ 'Remove an NFS service')
+ @handle_exceptions
+ def _nfs_rm(self, svc_id):
+ return self._rm_stateless_svc("nfs", svc_id)
+
+ @CLIWriteCommand('orchestrator service',
+ "name=action,type=CephChoices,strings=start|stop|reload "
+ "name=svc_type,type=CephString "
+ "name=svc_name,type=CephString",
+ 'Start, stop or reload an entire service (i.e. all daemons)')
+ @handle_exceptions
+ def _service_action(self, action, svc_type, svc_name):
completion = self.service_action(action, svc_type, service_name=svc_name)
self._orchestrator_wait([completion])
-
return HandleCommandResult()
- def _service_instance_action(self, cmd):
- action = cmd['action']
- svc_type = cmd['svc_type']
- svc_id = cmd['svc_id']
-
+ @CLIWriteCommand('orchestrator service-instance',
+ "name=action,type=CephChoices,strings=start|stop|reload "
+ "name=svc_type,type=CephString "
+ "name=svc_id,type=CephString",
+ 'Start, stop or reload a specific service instance')
+ @handle_exceptions
+ def _service_instance_action(self, action, svc_type, svc_id):
completion = self.service_action(action, svc_type, service_id=svc_id)
self._orchestrator_wait([completion])
-
return HandleCommandResult()
- def _set_backend(self, cmd):
+ @CLIWriteCommand('orchestrator set backend',
+ "name=module_name,type=CephString,req=true",
+ 'Select orchestrator module backend')
+ @handle_exceptions
+ def _set_backend(self, module_name):
"""
We implement a setter command instead of just having the user
modify the setting directly, so that we can validate they're setting
There isn't a mechanism for ensuring they don't *disable* the module
later, but this is better than nothing.
"""
-
mgr_map = self.get("mgr_map")
- module_name = cmd['module']
if module_name == "":
self.set_module_option("orchestrator", None)
return HandleCommandResult(-errno.EINVAL, stderr="Module '{0}' not found".format(module_name))
+ @CLIReadCommand('orchestrator status',
+ desc='Report configured backend and its status')
+ @handle_exceptions
def _status(self):
- try:
- avail, why = self.available()
- except NoOrchestrator:
- return HandleCommandResult(-errno.ENODEV,
- stderr="No orchestrator configured (try "
- "`ceph orchestrator set backend`)")
+ avail, why = self.available()
if avail is None:
# The module does not report its availability
avail,
" ({0})".format(why) if not avail else ""
))
-
- def handle_command(self, inbuf, cmd):
- try:
- return self._handle_command(inbuf, cmd)
- except NoOrchestrator:
- return HandleCommandResult(-errno.ENODEV, stderr="No orchestrator configured")
- except ImportError as e:
- return HandleCommandResult(-errno.ENOENT, stderr=str(e))
- except NotImplementedError:
- return HandleCommandResult(-errno.EINVAL, stderr="Command not found")
-
- def _handle_command(self, inbuf, cmd):
- if cmd['prefix'] == "orchestrator device ls":
- return self._list_devices(cmd)
- elif cmd['prefix'] == "orchestrator service ls":
- return self._list_services(cmd)
- elif cmd['prefix'] == "orchestrator service status":
- return self._list_services(cmd) # TODO: create more detailed output
- elif cmd['prefix'] == "orchestrator osd rm":
- return self._osd_rm(cmd)
- elif cmd['prefix'] == "orchestrator mds add":
- return self._mds_add(cmd)
- elif cmd['prefix'] == "orchestrator mds rm":
- return self._mds_rm(cmd)
- elif cmd['prefix'] == "orchestrator rgw add":
- return self._rgw_add(cmd)
- elif cmd['prefix'] == "orchestrator rgw rm":
- return self._rgw_rm(cmd)
- elif cmd['prefix'] == "orchestrator nfs add":
- return self._nfs_add(cmd)
- elif cmd['prefix'] == "orchestrator nfs rm":
- return self._nfs_rm(cmd)
- elif cmd['prefix'] == "orchestrator service":
- return self._service_action(cmd)
- elif cmd['prefix'] == "orchestrator service-instance":
- return self._service_instance_action(cmd)
- elif cmd['prefix'] == "orchestrator set backend":
- return self._set_backend(cmd)
- elif cmd['prefix'] == "orchestrator status":
- return self._status()
- elif cmd['prefix'] == "orchestrator osd create":
- return self._create_osd(inbuf, cmd)
- elif cmd['prefix'] == "orchestrator osd remove":
- return self._remove_osd(cmd)
- else:
- raise NotImplementedError()