@no_type_check
def handler(self, action):
+ '''
+ Control and report debug status in Ceph-Dashboard
+ '''
ret = 0
msg = ''
if action in [Actions.ENABLE.value, Actions.DISABLE.value]:
prefix="dashboard {name}".format(name=NAME),
args="name=action,type=CephChoices,strings={states}".format(
states="|".join(a.value for a in Actions)),
- desc="Control and report debug status in Ceph-Dashboard",
handler=handler
)
]
"name=action,type=CephChoices,strings={} ".format(
"|".join(a.value for a in Actions))
+ "name=features,type=CephChoices,strings={},req=false,n=N".format(
- "|".join(f.value for f in Features)),
- "Enable or disable features in Ceph-Mgr Dashboard")
+ "|".join(f.value for f in Features)))
def cmd(mgr, action, features=None):
+ '''
+ Enable or disable features in Ceph-Mgr Dashboard
+ '''
ret = 0
msg = []
if action in [Actions.ENABLE.value, Actions.DISABLE.value]:
# CLI dashboard access control scope commands
@CLIWriteCommand('dashboard set-login-credentials',
- 'name=username,type=CephString',
- 'Set the login credentials. Password read from -i <file>')
+ 'name=username,type=CephString')
@CLICheckNonemptyFileInput
def set_login_credentials_cmd(_, username, inbuf):
+ '''
+ Set the login credentials. Password read from -i <file>
+ '''
password = inbuf
try:
user = mgr.ACCESS_CTRL_DB.get_user(username)
@CLIReadCommand('dashboard ac-role-show',
- 'name=rolename,type=CephString,req=false',
- 'Show role info')
+ 'name=rolename,type=CephString,req=false')
def ac_role_show_cmd(_, rolename=None):
+ '''
+ Show role info
+ '''
if not rolename:
roles = dict(mgr.ACCESS_CTRL_DB.roles)
roles.update(SYSTEM_ROLES)
@CLIWriteCommand('dashboard ac-role-create',
'name=rolename,type=CephString '
- 'name=description,type=CephString,req=false',
- 'Create a new access control role')
+ 'name=description,type=CephString,req=false')
def ac_role_create_cmd(_, rolename, description=None):
+ '''
+ Create a new access control role
+ '''
try:
role = mgr.ACCESS_CTRL_DB.create_role(rolename, description)
mgr.ACCESS_CTRL_DB.save()
@CLIWriteCommand('dashboard ac-role-delete',
- 'name=rolename,type=CephString',
- 'Delete an access control role')
+ 'name=rolename,type=CephString')
def ac_role_delete_cmd(_, rolename):
+ '''
+ Delete an access control role
+ '''
try:
mgr.ACCESS_CTRL_DB.delete_role(rolename)
mgr.ACCESS_CTRL_DB.save()
@CLIWriteCommand('dashboard ac-role-add-scope-perms',
'name=rolename,type=CephString '
'name=scopename,type=CephString '
- 'name=permissions,type=CephString,n=N',
- 'Add the scope permissions for a role')
+ 'name=permissions,type=CephString,n=N')
def ac_role_add_scope_perms_cmd(_, rolename, scopename, permissions):
+ '''
+ Add the scope permissions for a role
+ '''
try:
role = mgr.ACCESS_CTRL_DB.get_role(rolename)
perms_array = [perm.strip() for perm in permissions]
@CLIWriteCommand('dashboard ac-role-del-scope-perms',
'name=rolename,type=CephString '
- 'name=scopename,type=CephString',
- 'Delete the scope permissions for a role')
+ 'name=scopename,type=CephString')
def ac_role_del_scope_perms_cmd(_, rolename, scopename):
+ '''
+ Delete the scope permissions for a role
+ '''
try:
role = mgr.ACCESS_CTRL_DB.get_role(rolename)
role.del_scope_permissions(scopename)
@CLIReadCommand('dashboard ac-user-show',
- 'name=username,type=CephString,req=false',
- 'Show user info')
+ 'name=username,type=CephString,req=false')
def ac_user_show_cmd(_, username=None):
+ '''
+ Show user info
+ '''
if not username:
users = mgr.ACCESS_CTRL_DB.users
users_list = [name for name, _ in users.items()]
'name=enabled,type=CephBool,req=false '
'name=force_password,type=CephBool,req=false '
'name=pwd_expiration_date,type=CephInt,req=false '
- 'name=pwd_update_required,type=CephBool,req=false',
- 'Create a user. Password read from -i <file>')
+ 'name=pwd_update_required,type=CephBool,req=false')
@CLICheckNonemptyFileInput
def ac_user_create_cmd(_, username, inbuf, rolename=None, name=None,
email=None, enabled=True, force_password=False,
pwd_expiration_date=None, pwd_update_required=False):
+ '''
+ Create a user. Password read from -i <file>
+ '''
password = inbuf
try:
role = mgr.ACCESS_CTRL_DB.get_role(rolename) if rolename else None
@CLIWriteCommand('dashboard ac-user-enable',
- 'name=username,type=CephString',
- 'Enable a user')
+ 'name=username,type=CephString')
def ac_user_enable(_, username):
+ '''
+ Enable a user
+ '''
try:
user = mgr.ACCESS_CTRL_DB.get_user(username)
user.enabled = True
@CLIWriteCommand('dashboard ac-user-disable',
- 'name=username,type=CephString',
- 'Disable a user')
+ 'name=username,type=CephString')
def ac_user_disable(_, username):
+ '''
+ Disable a user
+ '''
try:
user = mgr.ACCESS_CTRL_DB.get_user(username)
user.enabled = False
@CLIWriteCommand('dashboard ac-user-delete',
- 'name=username,type=CephString',
- 'Delete user')
+ 'name=username,type=CephString')
def ac_user_delete_cmd(_, username):
+ '''
+ Delete user
+ '''
try:
mgr.ACCESS_CTRL_DB.delete_user(username)
mgr.ACCESS_CTRL_DB.save()
@CLIWriteCommand('dashboard ac-user-set-roles',
'name=username,type=CephString '
- 'name=roles,type=CephString,n=N',
- 'Set user roles')
+ 'name=roles,type=CephString,n=N')
def ac_user_set_roles_cmd(_, username, roles):
+ '''
+ Set user roles
+ '''
rolesname = roles
roles = []
for rolename in rolesname:
@CLIWriteCommand('dashboard ac-user-add-roles',
'name=username,type=CephString '
- 'name=roles,type=CephString,n=N',
- 'Add roles to user')
+ 'name=roles,type=CephString,n=N')
def ac_user_add_roles_cmd(_, username, roles):
+ '''
+ Add roles to user
+ '''
rolesname = roles
roles = []
for rolename in rolesname:
@CLIWriteCommand('dashboard ac-user-del-roles',
'name=username,type=CephString '
- 'name=roles,type=CephString,n=N',
- 'Delete roles from user')
+ 'name=roles,type=CephString,n=N')
def ac_user_del_roles_cmd(_, username, roles):
+ '''
+ Delete roles from user
+ '''
rolesname = roles
roles = []
for rolename in rolesname:
@CLIWriteCommand('dashboard ac-user-set-password',
'name=username,type=CephString '
- 'name=force_password,type=CephBool,req=false',
- 'Set user password from -i <file>')
+ 'name=force_password,type=CephBool,req=false')
@CLICheckNonemptyFileInput
def ac_user_set_password(_, username, inbuf, force_password=False):
+ '''
+ Set user password from -i <file>
+ '''
password = inbuf
try:
user = mgr.ACCESS_CTRL_DB.get_user(username)
@CLIWriteCommand('dashboard ac-user-set-password-hash',
- 'name=username,type=CephString',
- 'Set user password bcrypt hash from -i <file>')
+ 'name=username,type=CephString')
@CLICheckNonemptyFileInput
def ac_user_set_password_hash(_, username, inbuf):
+ '''
+ Set user password bcrypt hash from -i <file>
+ '''
hashed_password = inbuf
try:
# make sure the hashed_password is actually a bcrypt hash
@CLIWriteCommand('dashboard ac-user-set-info',
'name=username,type=CephString '
'name=name,type=CephString '
- 'name=email,type=CephString',
- 'Set user info')
+ 'name=email,type=CephString')
def ac_user_set_info(_, username, name, email):
+ '''
+ Set user info
+ '''
try:
user = mgr.ACCESS_CTRL_DB.get_user(username)
if name:
ManagedByOrchestratorException
-@CLIReadCommand('dashboard iscsi-gateway-list', desc='List iSCSI gateways')
+@CLIReadCommand('dashboard iscsi-gateway-list')
def list_iscsi_gateways(_):
+ '''
+ List iSCSI gateways
+ '''
return 0, json.dumps(IscsiGatewaysConfig.get_gateways_config()), ''
@CLIWriteCommand('dashboard iscsi-gateway-add',
- 'name=name,type=CephString,req=false',
- 'Add iSCSI gateway configuration. Gateway URL read from -i <file>')
+ 'name=name,type=CephString,req=false')
@CLICheckNonemptyFileInput
def add_iscsi_gateway(_, inbuf, name=None):
+ '''
+ Add iSCSI gateway configuration. Gateway URL read from -i <file>
+ '''
service_url = inbuf
try:
IscsiGatewaysConfig.validate_service_url(service_url)
@CLIWriteCommand('dashboard iscsi-gateway-rm',
- 'name=name,type=CephString',
- 'Remove iSCSI gateway configuration')
+ 'name=name,type=CephString')
def remove_iscsi_gateway(_, name):
+ '''
+ Remove iSCSI gateway configuration
+ '''
try:
IscsiGatewaysConfig.remove_gateway(name)
return 0, 'Success', ''
@CLICommand('device query-daemon-health-metrics',
args='name=who,type=CephString',
- desc='Get device health metrics for a given daemon',
perm='r')
def do_query_daemon_health_metrics(self, who=''):
+ '''
+ Get device health metrics for a given daemon
+ '''
if not self.is_valid_daemon_name(who):
return -errno.EINVAL, '', 'not a valid mon or osd daemon name'
(daemon_type, daemon_id) = who.split('.')
@CLICommand('device scrape-daemon-health-metrics',
args='name=who,type=CephString',
- desc='Scrape and store device health metrics '
- 'for a given daemon',
perm='r')
def do_scrape_daemon_health_metrics(self, who=''):
+ '''
+ Scrape and store device health metrics for a given daemon
+ '''
if not self.is_valid_daemon_name(who):
return -errno.EINVAL, '', 'not a valid mon or osd daemon name'
(daemon_type, daemon_id) = who.split('.')
@CLICommand('device scrape-daemon-health-metrics',
args='name=devid,type=CephString,req=False',
- desc='Scrape and store device health metrics',
perm='r')
def do_scrape_health_metrics(self, devid=None):
+ '''
+ Scrape and store device health metrics
+ '''
if devid is None:
return self.scrape_all()
else:
@CLICommand('device get-health-metrics',
args=('name=devid,type=CephString ' +
'name=sample,type=CephString,req=False'),
- desc='Show stored device metrics for the device',
perm='r')
def do_get_health_metrics(self, devid, sample=None):
+ '''
+ Show stored device metrics for the device
+ '''
return self.show_device_metrics(devid, sample)
@CLICommand('device check-health',
- desc='Check life expectancy of devices',
perm='rw')
def do_check_health(self):
+ '''
+ Check life expectancy of devices
+ '''
return self.check_health()
@CLICommand('device monitoring on',
- desc='Enable device health monitoring',
perm='rw')
def do_monitoring_on(self):
+ '''
+ Enable device health monitoring
+ '''
self.set_module_option('enable_monitoring', True)
self.event.set()
@CLICommand('device monitoring off',
- desc='Disable device health monitoring',
perm='rw')
def do_monitoring_off(self):
+ '''
+ Disable device health monitoring
+ '''
self.set_module_option('enable_monitoring', False)
self.set_health_checks({}) # avoid stuck health alerts
@CLICommand('device predict-life-expectancy',
args='name=devid,type=CephString,req=true',
- desc='Predict life expectancy with local predictor',
perm='r')
def do_predict_life_expectancy(self, devid):
+ '''
+ Predict life expectancy with local predictor
+ '''
return self.predict_lift_expectancy(devid)
def self_test(self):
from typing_extensions import Literal
+import inspect
import logging
import errno
import functools
def __call__(self, func):
self.func = func
+ if not self.desc:
+ self.desc = inspect.getdoc(func)
self.COMMANDS[self.prefix] = self
return self.func
return [cmd.dump_cmd() for cmd in cls.COMMANDS.values()]
-def CLIReadCommand(prefix, args="", desc=""):
- return CLICommand(prefix, args, desc, "r")
+def CLIReadCommand(prefix, args=""):
+ return CLICommand(prefix, args, "r")
-def CLIWriteCommand(prefix, args="", desc=""):
- return CLICommand(prefix, args, desc, "w")
+def CLIWriteCommand(prefix, args=""):
+ return CLICommand(prefix, args, "w")
def CLICheckNonemptyFileInput(func):
def __init__(
self,
prefix,
+ handler,
args=None,
perm="rw",
desc=None,
poll=False,
- handler=None
):
super(Command, self).__init__(
cmd=prefix + (' ' + args if args else ''),
perm=perm,
- desc=desc,
poll=poll)
self.prefix = prefix
self.args = args
self.handler = handler
+ self['desc'] = inspect.getdoc(self.handler)
def register(self, instance=False):
"""
'name=path,type=CephString,req=false '
'name=subvol,type=CephString,req=false '
'name=fs,type=CephString,req=false '
- 'name=format,type=CephString,req=false',
- 'List current snapshot schedules')
+ 'name=format,type=CephString,req=false')
def snap_schedule_get(self, path='/', subvol=None, fs=None, format='plain'):
+ '''
+ List current snapshot schedules
+ '''
use_fs = fs if fs else self.default_fs
try:
ret_scheds = self.client.get_snap_schedules(use_fs, path)
'name=recursive,type=CephString,req=false '
'name=subvol,type=CephString,req=false '
'name=fs,type=CephString,req=false '
- 'name=format,type=CephString,req=false',
- 'Get current snapshot schedule for <path>')
+ 'name=format,type=CephString,req=false')
def snap_schedule_list(self, path, subvol=None, recursive=False, fs=None,
format='plain'):
+ '''
+ Get current snapshot schedule for <path>
+ '''
try:
use_fs = fs if fs else self.default_fs
scheds = self.client.list_snap_schedules(use_fs, path, recursive)
'name=snap-schedule,type=CephString '
'name=start,type=CephString,req=false '
'name=fs,type=CephString,req=false '
- 'name=subvol,type=CephString,req=false',
- 'Set a snapshot schedule for <path>')
+ 'name=subvol,type=CephString,req=false')
def snap_schedule_add(self,
path,
snap_schedule,
start=None,
fs=None,
subvol=None):
+ '''
+ Set a snapshot schedule for <path>
+ '''
try:
use_fs = fs if fs else self.default_fs
abs_path = self.resolve_subvolume_path(fs, subvol, path)
'name=repeat,type=CephString,req=false '
'name=start,type=CephString,req=false '
'name=subvol,type=CephString,req=false '
- 'name=fs,type=CephString,req=false',
- 'Remove a snapshot schedule for <path>')
+ 'name=fs,type=CephString,req=false')
def snap_schedule_rm(self,
path,
repeat=None,
start=None,
subvol=None,
fs=None):
+ '''
+ Remove a snapshot schedule for <path>
+ '''
try:
use_fs = fs if fs else self.default_fs
abs_path = self.resolve_subvolume_path(fs, subvol, path)
'name=retention-spec-or-period,type=CephString '
'name=retention-count,type=CephString,req=false '
'name=fs,type=CephString,req=false '
- 'name=subvol,type=CephString,req=false',
- 'Set a retention specification for <path>')
+ 'name=subvol,type=CephString,req=false')
def snap_schedule_retention_add(self,
path,
retention_spec_or_period,
retention_count=None,
fs=None,
subvol=None):
+ '''
+ Set a retention specification for <path>
+ '''
try:
use_fs = fs if fs else self.default_fs
abs_path = self.resolve_subvolume_path(fs, subvol, path)
'name=retention-spec-or-period,type=CephString '
'name=retention-count,type=CephString,req=false '
'name=fs,type=CephString,req=false '
- 'name=subvol,type=CephString,req=false',
- 'Remove a retention specification for <path>')
+ 'name=subvol,type=CephString,req=false')
def snap_schedule_retention_rm(self,
path,
retention_spec_or_period,
retention_count=None,
fs=None,
subvol=None):
+ '''
+ Remove a retention specification for <path>
+ '''
try:
use_fs = fs if fs else self.default_fs
abs_path = self.resolve_subvolume_path(fs, subvol, path)
'name=repeat,type=CephString,req=false '
'name=start,type=CephString,req=false '
'name=subvol,type=CephString,req=false '
- 'name=fs,type=CephString,req=false',
- 'Activate a snapshot schedule for <path>')
+ 'name=fs,type=CephString,req=false')
def snap_schedule_activate(self,
path,
repeat=None,
start=None,
subvol=None,
fs=None):
+ '''
+ Activate a snapshot schedule for <path>
+ '''
try:
use_fs = fs if fs else self.default_fs
abs_path = self.resolve_subvolume_path(fs, subvol, path)
'name=repeat,type=CephString,req=false '
'name=start,type=CephString,req=false '
'name=subvol,type=CephString,req=false '
- 'name=fs,type=CephString,req=false',
- 'Deactivate a snapshot schedule for <path>')
+ 'name=fs,type=CephString,req=false')
def snap_schedule_deactivate(self,
path,
repeat=None,
start=None,
subvol=None,
fs=None):
+ '''
+ Deactivate a snapshot schedule for <path>
+ '''
try:
use_fs = fs if fs else self.default_fs
abs_path = self.resolve_subvolume_path(fs, subvol, path)
for p in completions:
p.evaluate()
- @CLICommand('test_orchestrator load_data', '', 'load dummy data into test orchestrator', 'w')
+ @CLICommand('test_orchestrator load_data', args='', perm='w')
def _load_data(self, inbuf):
+ """
+ load dummy data into test orchestrator
+ """
try:
data = json.loads(inbuf)
self._init_data(data)