--- /dev/null
+# -*- coding: utf-8 -*-
+import functools
+import inspect
+from typing import Any, Callable
+
+from mgr_module import CLICommandBase, HandleCommandResult, HandlerFuncType
+
+DBCLICommand = CLICommandBase.make_registry_subtype("DBCLICommand")
+
+
+class DBCommand(dict):
+ """
+ Helper class to declare options for COMMANDS list.
+
+ It also allows to specify prefix and args separately, as well as storing a
+ handler callable.
+
+ Usage:
+ >>> def handler(): return 0, "", ""
+ >>> DBCommand(prefix="example",
+ ... handler=handler,
+ ... perm='w')
+ {'perm': 'w', 'poll': False}
+ """
+
+ def __init__(
+ self,
+ prefix: str,
+ handler: HandlerFuncType,
+ perm: str = "rw",
+ poll: bool = False,
+ ):
+ super().__init__(perm=perm,
+ poll=poll)
+ self.prefix = prefix
+ self.handler = handler
+
+ @staticmethod
+ def returns_command_result(instance: Any,
+ f: HandlerFuncType) -> Callable[..., HandleCommandResult]:
+ @functools.wraps(f)
+ def wrapper(mgr: Any, *args: Any, **kwargs: Any) -> HandleCommandResult:
+ retval, stdout, stderr = f(instance or mgr, *args, **kwargs)
+ return HandleCommandResult(retval, stdout, stderr)
+ wrapper.__signature__ = inspect.signature(f) # type: ignore[attr-defined]
+ return wrapper
+
+ def register(self, instance: bool = False) -> HandlerFuncType:
+ """
+ Register a CLICommand handler. It allows an instance to register bound
+ methods. In that case, the mgr instance is not passed, and it's expected
+ to be available in the class instance.
+ It also uses HandleCommandResult helper to return a wrapped a tuple of 3
+ items.
+ """
+ cmd = DBCLICommand(prefix=self.prefix, perm=self['perm'])
+ return cmd(self.returns_command_result(instance, self.handler))
+ return None
from typing_extensions import Literal
from ceph.cryptotools.select import choose_crypto_caller
-from mgr_module import CLIReadCommand, CLIWriteCommand, HandleCommandResult, \
- MgrModule, MgrStandbyModule, NotifyType, Option, _get_localized_key
+from mgr_module import HandleCommandResult, MgrModule, MgrStandbyModule, \
+ NotifyType, Option, _get_localized_key
from mgr_util import ServerConfigException, build_url, \
create_self_signed_cert, get_default_addr, verify_tls_files
from . import mgr
+from .cli import DBCLICommand
from .controllers import nvmeof # noqa # pylint: disable=unused-import
from .controllers import Router, json_error_page
from .grafana import push_local_dashboards
"""
dashboard module entrypoint
"""
+ CLICommand = DBCLICommand
COMMANDS = [
{
cluster_credentials_files, clusters_credentials = multi_cluster_instance.get_cluster_credentials_files(targets) # noqa E501 #pylint: disable=line-too-long
return cluster_credentials_files, clusters_credentials
- @CLIWriteCommand("dashboard set-ssl-certificate")
+ @DBCLICommand.Write("dashboard set-ssl-certificate")
def set_ssl_certificate(self, mgr_id: Optional[str] = None, inbuf: Optional[str] = None):
return self._set_ssl_item('certificate', 'crt', mgr_id, inbuf)
- @CLIWriteCommand("dashboard set-ssl-certificate-key")
+ @DBCLICommand.Write("dashboard set-ssl-certificate-key")
def set_ssl_certificate_key(self, mgr_id: Optional[str] = None, inbuf: Optional[str] = None):
return self._set_ssl_item('certificate key', 'key', mgr_id, inbuf)
- @CLIWriteCommand("dashboard create-self-signed-cert")
+ @DBCLICommand.Write("dashboard create-self-signed-cert")
def set_mgr_created_self_signed_cert(self):
cert, pkey = create_self_signed_cert('IT', 'ceph-dashboard')
result = HandleCommandResult(*self.set_ssl_certificate(inbuf=cert))
return result
return 0, 'Self-signed certificate created', ''
- @CLIWriteCommand("dashboard set-rgw-credentials")
+ @DBCLICommand.Write("dashboard set-rgw-credentials")
def set_rgw_credentials(self):
try:
rgw_service_manager = RgwServiceManager()
return 0, 'RGW credentials configured', ''
- @CLIWriteCommand("dashboard set-rgw-hostname")
+ @DBCLICommand.Write("dashboard set-rgw-hostname")
def set_rgw_hostname(self, daemon_name: str, hostname: str):
try:
rgw_service_manager = RgwServiceManager()
except Exception as error:
return -errno.EINVAL, '', str(error)
- @CLIWriteCommand("dashboard unset-rgw-hostname")
+ @DBCLICommand.Write("dashboard unset-rgw-hostname")
def unset_rgw_hostname(self, daemon_name: str):
try:
rgw_service_manager = RgwServiceManager()
except Exception as error:
return -errno.EINVAL, '', str(error)
- @CLIWriteCommand("dashboard set-login-banner")
+ @DBCLICommand.Write("dashboard set-login-banner")
def set_login_banner(self, inbuf: str):
'''
Set the custom login banner read from -i <file>
mgr.set_store('custom_login_banner', inbuf)
return HandleCommandResult(stdout=f'{item_label} added')
- @CLIReadCommand("dashboard get-login-banner")
+ @DBCLICommand.Read("dashboard get-login-banner")
def get_login_banner(self):
'''
Get the custom login banner text
else:
return HandleCommandResult(stdout=banner_text)
- @CLIWriteCommand("dashboard unset-login-banner")
+ @DBCLICommand.Write("dashboard unset-login-banner")
def unset_login_banner(self):
'''
Unset the custom login banner
# allow cors by setting cross_origin_url
# the value is a comma separated list of URLs
- @CLIWriteCommand("dashboard set-cross-origin-url")
+ @DBCLICommand.Write("dashboard set-cross-origin-url")
def set_cross_origin_url(self, value: str):
cross_origin_urls = self.get_module_option('cross_origin_url', '')
cross_origin_urls_list = [url.strip()
configure_cors()
return 0, 'Cross-origin URL set', ''
- @CLIReadCommand("dashboard get-cross-origin-url")
+ @DBCLICommand.Read("dashboard get-cross-origin-url")
def get_cross_origin_url(self):
urls = self.get_module_option('cross_origin_url', '')
if urls:
return HandleCommandResult(stdout=urls) # type: ignore
return HandleCommandResult(stdout='No cross-origin URL set')
- @CLIReadCommand("dashboard rm-cross-origin-url")
+ @DBCLICommand.Read("dashboard rm-cross-origin-url")
def rm_cross_origin_url(self, value: str):
urls = self.get_module_option('cross_origin_url', '')
urls_list = [url.strip() for url in urls.split(',')] # type: ignore
from typing import Dict, List, Optional, Set, no_type_check
import cherrypy
-from mgr_module import CLICommand, Option
+from mgr_module import Option
from mgr_util import CLIWarning
+from ..cli import DBCLICommand
from ..controllers.cephfs import CephFS
from ..controllers.iscsi import Iscsi, IscsiTarget
from ..controllers.nfs import NFSGaneshaExports, NFSGaneshaUi
@PM.add_hook
def register_commands(self):
- @CLICommand("dashboard feature")
+ @DBCLICommand("dashboard feature")
def cmd(mgr,
action: Actions = Actions.STATUS,
features: Optional[List[Features]] = None):
from typing import Dict, NamedTuple, Optional
from ceph.utils import datetime_now, datetime_to_str, parse_timedelta, str_to_datetime
-from mgr_module import CLICommand
+from ..cli import DBCLICommand
from . import PLUGIN_MANAGER as PM
from .plugin import SimplePlugin as SP
@PM.add_hook
def register_commands(self):
- @CLICommand("dashboard {name} get".format(name=self.NAME))
+ @DBCLICommand("dashboard {name} get".format(name=self.NAME))
def _get(_):
stdout: str
value: str = self.get_option(self.NAME)
'expires="{expires}"'.format(**data)
return 0, stdout, ''
- @CLICommand("dashboard {name} set".format(name=self.NAME))
+ @DBCLICommand("dashboard {name} set".format(name=self.NAME))
def _set(_, severity: MotdSeverity, expires: str, message: str):
if expires != '0':
delta = parse_timedelta(expires)
self.set_option(self.NAME, value)
return 0, 'Message of the day has been set.', ''
- @CLICommand("dashboard {name} clear".format(name=self.NAME))
+ @DBCLICommand("dashboard {name} clear".format(name=self.NAME))
def _clear(_):
self.set_option(self.NAME, '')
return 0, 'Message of the day has been cleared.', ''
from typing import no_type_check
-from mgr_module import Command, Option
+from mgr_module import Option
+from ..cli import DBCommand
from . import PLUGIN_MANAGER as PM
from . import interfaces as I # noqa: E741,N812
- Default Mixins/Interfaces: CanMgr, HasOptions & HasCommands
- Options are defined by OPTIONS class variable, instead from get_options hook
- Commands are created with by COMMANDS list of Commands() and handlers
- (less compact than CLICommand, but allows using method instances)
+ (less compact than but allows using method instances)
"""
Option = Option
- Command = Command
+ Command = DBCommand
@PM.add_hook
def get_options(self):
from typing import List, Optional, Sequence
from ceph.cryptotools.select import get_crypto_caller
-from mgr_module import CLICheckNonemptyFileInput, CLIReadCommand, CLIWriteCommand
+from mgr_module import CLICheckNonemptyFileInput
from mgr_util import password_hash
from .. import mgr
+from ..cli import DBCLICommand
from ..exceptions import PasswordPolicyException, PermissionNotValid, \
PwdExpirationDateNotValid, RoleAlreadyExists, RoleDoesNotExist, \
RoleIsAssociatedWithUser, RoleNotInUser, ScopeNotInRole, ScopeNotValid, \
# CLI dashboard access control scope commands
-@CLIWriteCommand('dashboard set-login-credentials')
+@DBCLICommand.Write('dashboard set-login-credentials')
@CLICheckNonemptyFileInput(desc=DEFAULT_FILE_DESC)
def set_login_credentials_cmd(_, username: str, inbuf: str):
'''
Username and password updated''', ''
-@CLIReadCommand('dashboard ac-role-show')
+@DBCLICommand.Read('dashboard ac-role-show')
def ac_role_show_cmd(_, rolename: Optional[str] = None):
'''
Show role info
return 0, json.dumps(role.to_dict()), ''
-@CLIWriteCommand('dashboard ac-role-create')
+@DBCLICommand.Write('dashboard ac-role-create')
def ac_role_create_cmd(_, rolename: str, description: Optional[str] = None):
'''
Create a new access control role
return -errno.EEXIST, '', str(ex)
-@CLIWriteCommand('dashboard ac-role-delete')
+@DBCLICommand.Write('dashboard ac-role-delete')
def ac_role_delete_cmd(_, rolename: str):
'''
Delete an access control role
return -errno.EPERM, '', str(ex)
-@CLIWriteCommand('dashboard ac-role-add-scope-perms')
+@DBCLICommand.Write('dashboard ac-role-add-scope-perms')
def ac_role_add_scope_perms_cmd(_,
rolename: str,
scopename: str,
.format(Permission.all_permissions())
-@CLIWriteCommand('dashboard ac-role-del-scope-perms')
+@DBCLICommand.Write('dashboard ac-role-del-scope-perms')
def ac_role_del_scope_perms_cmd(_, rolename: str, scopename: str):
'''
Delete the scope permissions for a role
return -errno.ENOENT, '', str(ex)
-@CLIReadCommand('dashboard ac-user-show')
+@DBCLICommand.Read('dashboard ac-user-show')
def ac_user_show_cmd(_, username: Optional[str] = None):
'''
Show user info
return -errno.ENOENT, '', str(ex)
-@CLIWriteCommand('dashboard ac-user-create')
+@DBCLICommand.Write('dashboard ac-user-create')
@CLICheckNonemptyFileInput(desc=DEFAULT_FILE_DESC)
def ac_user_create_cmd(_, username: str, inbuf: str,
rolename: Optional[str] = None,
return 0, json.dumps(user.to_dict()), ''
-@CLIWriteCommand('dashboard ac-user-enable')
+@DBCLICommand.Write('dashboard ac-user-enable')
def ac_user_enable(_, username: str):
'''
Enable a user
return -errno.ENOENT, '', str(ex)
-@CLIWriteCommand('dashboard ac-user-disable')
+@DBCLICommand.Write('dashboard ac-user-disable')
def ac_user_disable(_, username: str):
'''
Disable a user
return -errno.ENOENT, '', str(ex)
-@CLIWriteCommand('dashboard ac-user-delete')
+@DBCLICommand.Write('dashboard ac-user-delete')
def ac_user_delete_cmd(_, username: str):
'''
Delete user
return -errno.ENOENT, '', str(ex)
-@CLIWriteCommand('dashboard ac-user-set-roles')
+@DBCLICommand.Write('dashboard ac-user-set-roles')
def ac_user_set_roles_cmd(_, username: str, roles: Sequence[str]):
'''
Set user roles
return -errno.ENOENT, '', str(ex)
-@CLIWriteCommand('dashboard ac-user-add-roles')
+@DBCLICommand.Write('dashboard ac-user-add-roles')
def ac_user_add_roles_cmd(_, username: str, roles: Sequence[str]):
'''
Add roles to user
return -errno.ENOENT, '', str(ex)
-@CLIWriteCommand('dashboard ac-user-del-roles')
+@DBCLICommand.Write('dashboard ac-user-del-roles')
def ac_user_del_roles_cmd(_, username: str, roles: Sequence[str]):
'''
Delete roles from user
return -errno.ENOENT, '', str(ex)
-@CLIWriteCommand('dashboard ac-user-set-password')
+@DBCLICommand.Write('dashboard ac-user-set-password')
@CLICheckNonemptyFileInput(desc=DEFAULT_FILE_DESC)
def ac_user_set_password(_, username: str, inbuf: str,
force_password: bool = False):
return -errno.ENOENT, '', str(ex)
-@CLIWriteCommand('dashboard ac-user-set-password-hash')
+@DBCLICommand.Write('dashboard ac-user-set-password-hash')
@CLICheckNonemptyFileInput(desc=DEFAULT_FILE_DESC)
def ac_user_set_password_hash(_, username: str, inbuf: str):
'''
return -errno.ENOENT, '', str(ex)
-@CLIWriteCommand('dashboard ac-user-set-info')
+@DBCLICommand.Write('dashboard ac-user-set-info')
def ac_user_set_info(_, username: str, name: str, email: str):
'''
Set user info
import json
from typing import Optional
-from mgr_module import CLICheckNonemptyFileInput, CLIReadCommand, CLIWriteCommand
+from mgr_module import CLICheckNonemptyFileInput
+from ..cli import DBCLICommand
from ..rest_client import RequestException
from .iscsi_client import IscsiClient
from .iscsi_config import InvalidServiceUrl, IscsiGatewayAlreadyExists, \
ManagedByOrchestratorException
-@CLIReadCommand('dashboard iscsi-gateway-list')
+@DBCLICommand.Read('dashboard iscsi-gateway-list')
def list_iscsi_gateways(_):
'''
List iSCSI gateways
return 0, json.dumps(IscsiGatewaysConfig.get_gateways_config()), ''
-@CLIWriteCommand('dashboard iscsi-gateway-add')
+@DBCLICommand.Write('dashboard iscsi-gateway-add')
@CLICheckNonemptyFileInput(desc='iSCSI gateway configuration')
def add_iscsi_gateway(_, inbuf, name: Optional[str] = None):
'''
return -errno.EINVAL, '', str(ex)
-@CLIWriteCommand('dashboard iscsi-gateway-rm')
+@DBCLICommand.Write('dashboard iscsi-gateway-rm')
def remove_iscsi_gateway(_, name: str):
'''
Remove iSCSI gateway configuration
Union, get_args, get_origin, get_type_hints
import yaml
-from mgr_module import CLICheckNonemptyFileInput, CLICommand, CLIReadCommand, \
- CLIWriteCommand, HandleCommandResult, HandlerFuncType
+from mgr_module import CLICheckNonemptyFileInput, HandleCommandResult, HandlerFuncType
from prettytable import PrettyTable
+from ..cli import DBCLICommand
from ..model.nvmeof import CliFieldTransformer, CliFlags, CliHeader
from ..rest_client import RequestException
from .nvmeof_conf import ManagedByOrchestratorException, \
NvmeofGatewayAlreadyExists, NvmeofGatewaysConfig
-@CLIReadCommand('dashboard nvmeof-gateway-list')
+@DBCLICommand.Read('dashboard nvmeof-gateway-list')
def list_nvmeof_gateways(_):
'''
List NVMe-oF gateways
return 0, json.dumps(NvmeofGatewaysConfig.get_gateways_config()), ''
-@CLIWriteCommand('dashboard nvmeof-gateway-add')
+@DBCLICommand.Write('dashboard nvmeof-gateway-add')
@CLICheckNonemptyFileInput(desc='NVMe-oF gateway configuration')
def add_nvmeof_gateway(_, inbuf, name: str, group: str, daemon_name: str):
'''
return -errno.EINVAL, '', str(ex)
-@CLIWriteCommand('dashboard nvmeof-gateway-rm')
+@DBCLICommand.Write('dashboard nvmeof-gateway-rm')
def remove_nvmeof_gateway(_, name: str, daemon_name: str = ''):
'''
Remove NVMe-oF gateway configuration
return self._convert_to_text_output(data, model)
-class NvmeofCLICommand(CLICommand):
+class NvmeofCLICommand(DBCLICommand):
desc: str
def __init__(self, prefix, model: Type[NamedTuple], alias=None, perm='rw', poll=False):
from typing import Dict, Optional
from urllib import parse
-from mgr_module import CLIWriteCommand, HandleCommandResult
+from mgr_module import HandleCommandResult
from .. import mgr
# Saml2 and OAuth2 needed to be recognized by .__subclasses__()
# pylint: disable=unused-import
+from ..cli import DBCLICommand
from ..services.auth import AuthType, BaseAuth, OAuth2, Saml2 # noqa
from ..tools import prepare_url_prefix
mgr.SSO_DB = SsoDB.load() # type: ignore
-@CLIWriteCommand("dashboard sso enable oauth2")
+@DBCLICommand.Write("dashboard sso enable oauth2")
def enable_sso(_, roles_path: Optional[str] = None):
mgr.SSO_DB.protocol = AuthType.OAUTH2
if jmespath and roles_path:
from pyfakefs import fake_filesystem
from .. import mgr
+from ..cli import DBCLICommand
from ..controllers import generate_controller_routes, json_error_page
from ..controllers._version import APIVersion
from ..module import Module
class ModuleTestClass(Module):
"""Dashboard module subclass for testing the module methods."""
+ CLICommand = DBCLICommand
def __init__(self) -> None:
pass
self.assertEqual(str(ctx.exception), "Role 'test_role' does not exist")
def test_add_role_invalid_scope_perms(self):
+ assert "dashboard ac-role-create" in self._dashboard_module.CLICommand.COMMANDS
self.test_create_role()
with self.assertRaises(CmdException) as ctx:
import errno
import json
+import logging
import unittest
from typing import Annotated, List, NamedTuple
from unittest.mock import MagicMock
import pytest
-from mgr_module import CLICommand, HandleCommandResult
+from mgr_module import CLICommandBase, HandleCommandResult
from ..controllers import EndpointDoc
from ..model.nvmeof import CliFieldTransformer, CliFlags, CliHeader
mock_result = {'a': 'b'}
super_mock = MagicMock()
super_mock.return_value = mock_result
- monkeypatch.setattr(CLICommand, 'call', super_mock)
+ monkeypatch.setattr(CLICommandBase, 'call', super_mock)
return super_mock
mock_result = None
super_mock = MagicMock()
super_mock.return_value = mock_result
- monkeypatch.setattr(CLICommand, 'call', super_mock)
+ monkeypatch.setattr(CLICommandBase, 'call', super_mock)
return super_mock
def test_command_return_cmd_result_default_format(self, base_call_mock, sample_command):
result = NvmeofCLICommand.COMMANDS[sample_command].call(MagicMock(), {})
+ logging.getLogger().error(result)
assert isinstance(result, HandleCommandResult)
assert result.retval == 0
assert result.stdout == (
if k != 'self' and v is not None)
-class Command(dict):
- """
- Helper class to declare options for COMMANDS list.
-
- It also allows to specify prefix and args separately, as well as storing a
- handler callable.
-
- Usage:
- >>> def handler(): return 0, "", ""
- >>> Command(prefix="example",
- ... handler=handler,
- ... perm='w')
- {'perm': 'w', 'poll': False}
- """
-
- def __init__(
- self,
- prefix: str,
- handler: HandlerFuncType,
- perm: str = "rw",
- poll: bool = False,
- ):
- super().__init__(perm=perm,
- poll=poll)
- self.prefix = prefix
- self.handler = handler
-
- @staticmethod
- def returns_command_result(instance: Any,
- f: HandlerFuncType) -> Callable[..., HandleCommandResult]:
- @functools.wraps(f)
- def wrapper(mgr: Any, *args: Any, **kwargs: Any) -> HandleCommandResult:
- retval, stdout, stderr = f(instance or mgr, *args, **kwargs)
- return HandleCommandResult(retval, stdout, stderr)
- wrapper.__signature__ = inspect.signature(f) # type: ignore[attr-defined]
- return wrapper
-
- def register(self, instance: bool = False) -> HandlerFuncType:
- """
- Register a CLICommand handler. It allows an instance to register bound
- methods. In that case, the mgr instance is not passed, and it's expected
- to be available in the class instance.
- It also uses HandleCommandResult helper to return a wrapped a tuple of 3
- items.
- """
- cmd = CLICommand(prefix=self.prefix, perm=self['perm'])
- return cmd(self.returns_command_result(instance, self.handler))
-
-
class CPlusPlusHandler(logging.Handler):
def __init__(self, module_inst: Any):
super(CPlusPlusHandler, self).__init__()