import tempfile
import threading
import time
-from typing import Optional
+from typing import TYPE_CHECKING, Optional
-from mgr_module import CLIWriteCommand, MgrModule, MgrStandbyModule, Option, _get_localized_key
+if TYPE_CHECKING:
+ if sys.version_info >= (3, 8):
+ from typing import Literal
+ else:
+ from typing_extensions import Literal
+
+from mgr_module import CLIWriteCommand, HandleCommandResult, MgrModule, \
+ MgrStandbyModule, Option, _get_localized_key
from mgr_util import ServerConfigException, create_self_signed_cert, \
get_default_addr, verify_tls_files
return uri
+if TYPE_CHECKING:
+ SslConfigKey = Literal['crt', 'key']
+
+
class Module(MgrModule, CherryPyConfig):
"""
dashboard module entrypoint
logger.info('Stopping engine...')
self.shutdown_event.set()
- @CLIWriteCommand("dashboard set-ssl-certificate")
- def set_ssl_certificate(self,
- mgr_id: Optional[str] = None,
- inbuf: Optional[bytes] = None):
+ def _set_ssl_item(self, item_label: str, item_key: 'SslConfigKey' = 'crt',
+ mgr_id: Optional[str] = None, inbuf: Optional[str] = None):
if inbuf is None:
- return -errno.EINVAL, '',\
- 'Please specify the certificate file with "-i" option'
+ return -errno.EINVAL, '', f'Please specify the {item_label} with "-i" option'
+
if mgr_id is not None:
- self.set_store(_get_localized_key(mgr_id, 'crt'), inbuf.decode())
+ self.set_store(_get_localized_key(mgr_id, item_key), inbuf)
else:
- self.set_store('crt', inbuf.decode())
- return 0, 'SSL certificate updated', ''
+ self.set_store(item_key, inbuf)
+ return 0, f'SSL {item_label} updated', ''
+
+ @CLIWriteCommand("dashboard set-ssl-certificate")
+ def set_ssl_certificate(self, mgr_id: Optional[str] = None, inbuf: Optional[str] = None):
+ return self._set_ssl_item('certificate', 'crt', mgr_id, inbuf)
@CLIWriteCommand("dashboard set-ssl-certificate-key")
- def set_ssl_certificate_key(self,
- mgr_id: Optional[str] = None,
- inbuf: Optional[bytes] = None):
- if inbuf is None:
- return -errno.EINVAL, '',\
- 'Please specify the certificate key file with "-i" option'
- if mgr_id is not None:
- self.set_store(_get_localized_key(mgr_id, 'key'), inbuf.decode())
- else:
- self.set_store('key', inbuf.decode())
- return 0, 'SSL certificate key updated', ''
+ def set_ssl_certificate_key(self, mgr_id: Optional[str] = None, inbuf: Optional[str] = None):
+ return self._set_ssl_item('certificate key', 'key', mgr_id, inbuf)
+
+ @CLIWriteCommand("dashboard create-self-signed-cert")
+ def set_mgr_created_self_signed_cert(self):
+ cert, pkey = create_self_signed_cert('IT', 'ceph-dashboard')
+ result = HandleCommandResult(*self.set_ssl_certificate(inbuf=cert))
+ if result.retval != 0:
+ return result
+
+ result = HandleCommandResult(*self.set_ssl_certificate_key(inbuf=pkey))
+ if result.retval != 0:
+ return result
+ return 0, 'Self-signed certificate created', ''
def handle_command(self, inbuf, cmd):
# pylint: disable=too-many-return-statements
if cmd['prefix'] == 'dashboard get-jwt-token-ttl':
ttl = self.get_module_option('jwt_token_ttl', JwtManager.JWT_TOKEN_TTL)
return 0, str(ttl), ''
- if cmd['prefix'] == 'dashboard create-self-signed-cert':
- self.create_self_signed_cert()
- return 0, 'Self-signed certificate created', ''
if cmd['prefix'] == 'dashboard grafana dashboards update':
push_local_dashboards()
return 0, 'Grafana dashboards updated', ''
return (-errno.EINVAL, '', 'Command not found \'{0}\''
.format(cmd['prefix']))
- def create_self_signed_cert(self):
- cert, pkey = create_self_signed_cert('IT', 'ceph-dashboard')
- self.set_store('crt', cert)
- self.set_store('key', pkey)
-
def notify(self, notify_type, notify_id):
NotificationQueue.new_notification(notify_type, notify_id)
import cherrypy
from cherrypy._cptools import HandlerWrapperTool
from cherrypy.test import helper
-from mgr_module import CLICommand
+from mgr_module import HandleCommandResult
from pyfakefs import fake_filesystem
from .. import DEFAULT_VERSION, mgr
from ..controllers import generate_controller_routes, json_error_page
+from ..module import Module
from ..plugins import PLUGIN_MANAGER, debug, feature_toggles # noqa
from ..services.auth import AuthManagerTool
from ..services.exception import dashboard_exception_handler
logger = logging.getLogger('tests')
+class ModuleTestClass(Module):
+ """Dashboard module subclass for testing the module methods."""
+
+ def __init__(self) -> None:
+ pass
+
+ def _unconfigure_logging(self) -> None:
+ pass
+
+
class CmdException(Exception):
def __init__(self, retcode, message):
super(CmdException, self).__init__(message)
self.retcode = retcode
-def exec_dashboard_cmd(command_handler, cmd, **kwargs):
- inbuf = kwargs['inbuf'] if 'inbuf' in kwargs else None
- cmd_dict = {'prefix': 'dashboard {}'.format(cmd)}
- cmd_dict.update(kwargs)
- if cmd_dict['prefix'] not in CLICommand.COMMANDS:
- ret, out, err = command_handler(cmd_dict)
- if ret < 0:
- raise CmdException(ret, err)
- try:
- return json.loads(out)
- except ValueError:
- return out
-
- ret, out, err = CLICommand.COMMANDS[cmd_dict['prefix']].call(mgr, cmd_dict, inbuf)
- if ret < 0:
- raise CmdException(ret, err)
- try:
- return json.loads(out)
- except ValueError:
- return out
-
-
class KVStoreMockMixin(object):
CONFIG_KEY_DICT = {}
return cls.CONFIG_KEY_DICT.get(key, None)
+# pylint: disable=protected-access
class CLICommandTestMixin(KVStoreMockMixin):
+ _dashboard_module = ModuleTestClass()
+
@classmethod
def exec_cmd(cls, cmd, **kwargs):
- return exec_dashboard_cmd(None, cmd, **kwargs)
+ inbuf = kwargs['inbuf'] if 'inbuf' in kwargs else None
+ cmd_dict = {'prefix': 'dashboard {}'.format(cmd)}
+ cmd_dict.update(kwargs)
+
+ result = HandleCommandResult(*cls._dashboard_module._handle_command(inbuf, cmd_dict))
+
+ if result.retval < 0:
+ raise CmdException(result.retval, result.stderr)
+ try:
+ return json.loads(result.stdout)
+ except ValueError:
+ return result.stdout
class FakeFsMixin(object):
--- /dev/null
+import errno
+import unittest
+
+from ..tests import CLICommandTestMixin, CmdException
+
+
+class SslTest(unittest.TestCase, CLICommandTestMixin):
+
+ def test_ssl_certificate_and_key(self):
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('set-ssl-certificate', inbuf='', mgr_id='x')
+ self.assertEqual(ctx.exception.retcode, -errno.EINVAL)
+ self.assertEqual(str(ctx.exception), 'Please specify the certificate with "-i" option')
+
+ result = self.exec_cmd('set-ssl-certificate', inbuf='content', mgr_id='x')
+ self.assertEqual(result, 'SSL certificate updated')
+
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('set-ssl-certificate-key', inbuf='', mgr_id='x')
+ self.assertEqual(ctx.exception.retcode, -errno.EINVAL)
+ self.assertEqual(str(ctx.exception), 'Please specify the certificate key with "-i" option')
+
+ result = self.exec_cmd('set-ssl-certificate-key', inbuf='content', mgr_id='x')
+ self.assertEqual(result, 'SSL certificate key updated')
+
+ def test_set_mgr_created_self_signed_cert(self):
+ result = self.exec_cmd('create-self-signed-cert')
+ self.assertEqual(result, 'Self-signed certificate created')
import errno
import unittest
-from ..services.sso import handle_sso_command, load_sso_db
-from . import CmdException # pylint: disable=no-name-in-module
-from . import KVStoreMockMixin # pylint: disable=no-name-in-module
-from . import exec_dashboard_cmd # pylint: disable=no-name-in-module
+from ..services.sso import load_sso_db
+from . import CLICommandTestMixin, CmdException # pylint: disable=no-name-in-module
-class AccessControlTest(unittest.TestCase, KVStoreMockMixin):
+class AccessControlTest(unittest.TestCase, CLICommandTestMixin):
IDP_METADATA = '''<?xml version="1.0"?>
<md:EntityDescriptor xmlns:md="urn:oasis:names:tc:SAML:2.0:metadata"
xmlns:ds="http://www.w3.org/2000/09/xmldsig#"
self.mock_kv_store()
load_sso_db()
- @classmethod
- def exec_cmd(cls, cmd, **kwargs):
- return exec_dashboard_cmd(handle_sso_command, cmd, **kwargs)
-
def validate_onelogin_settings(self, onelogin_settings, ceph_dashboard_base_url, uid,
sp_x509cert, sp_private_key, signature_enabled):
self.assertIn('sp', onelogin_settings)