# -*- coding: utf-8 -*-
+# pylint: disable=too-many-arguments
from __future__ import absolute_import
import json
+import threading
+import time
+
+import cherrypy
+from cherrypy._cptools import HandlerWrapperTool
+from cherrypy.test import helper
from mgr_module import CLICommand, MgrModule
-from .. import mgr
+
+from .. import logger, mgr
+from ..controllers import json_error_page, generate_controller_routes
+from ..services.auth import AuthManagerTool
+from ..services.exception import dashboard_exception_handler
class CmdException(Exception):
return json.loads(out)
except ValueError:
return out
+
+
+class KVStoreMockMixin(object):
+ CONFIG_KEY_DICT = {}
+
+ @classmethod
+ def mock_set_module_option(cls, attr, val):
+ cls.CONFIG_KEY_DICT[attr] = val
+
+ @classmethod
+ def mock_get_module_option(cls, attr, default=None):
+ return cls.CONFIG_KEY_DICT.get(attr, default)
+
+ @classmethod
+ def mock_kv_store(cls):
+ cls.CONFIG_KEY_DICT.clear()
+ mgr.set_module_option.side_effect = cls.mock_set_module_option
+ mgr.get_module_option.side_effect = cls.mock_get_module_option
+ # kludge below
+ mgr.set_store.side_effect = cls.mock_set_module_option
+ mgr.get_store.side_effect = cls.mock_get_module_option
+
+ @classmethod
+ def get_key(cls, key):
+ return cls.CONFIG_KEY_DICT.get(key, None)
+
+
+class CLICommandTestMixin(KVStoreMockMixin):
+ @classmethod
+ def exec_cmd(cls, cmd, **kwargs):
+ return exec_dashboard_cmd(None, cmd, **kwargs)
+
+
+class ControllerTestCase(helper.CPWebCase):
+ @classmethod
+ def setup_controllers(cls, ctrl_classes, base_url=''):
+ if not isinstance(ctrl_classes, list):
+ ctrl_classes = [ctrl_classes]
+ mapper = cherrypy.dispatch.RoutesDispatcher()
+ endpoint_list = []
+ for ctrl in ctrl_classes:
+ inst = ctrl()
+ for endpoint in ctrl.endpoints():
+ endpoint.inst = inst
+ endpoint_list.append(endpoint)
+ endpoint_list = sorted(endpoint_list, key=lambda e: e.url)
+ for endpoint in endpoint_list:
+ generate_controller_routes(endpoint, mapper, base_url)
+ if base_url == '':
+ base_url = '/'
+ cherrypy.tree.mount(None, config={
+ base_url: {'request.dispatch': mapper}})
+
+ def __init__(self, *args, **kwargs):
+ cherrypy.tools.authenticate = AuthManagerTool()
+ cherrypy.tools.dashboard_exception_handler = HandlerWrapperTool(dashboard_exception_handler,
+ priority=31)
+ cherrypy.config.update({
+ 'error_page.default': json_error_page,
+ 'tools.json_in.on': True,
+ 'tools.json_in.force': False
+ })
+ super(ControllerTestCase, self).__init__(*args, **kwargs)
+
+ def _request(self, url, method, data=None):
+ if not data:
+ b = None
+ h = None
+ else:
+ b = json.dumps(data)
+ h = [('Content-Type', 'application/json'),
+ ('Content-Length', str(len(b)))]
+ self.getPage(url, method=method, body=b, headers=h)
+
+ def _get(self, url):
+ self._request(url, 'GET')
+
+ def _post(self, url, data=None):
+ self._request(url, 'POST', data)
+
+ def _delete(self, url, data=None):
+ self._request(url, 'DELETE', data)
+
+ def _put(self, url, data=None):
+ self._request(url, 'PUT', data)
+
+ def _task_request(self, method, url, data, timeout):
+ self._request(url, method, data)
+ if self.status != '202 Accepted':
+ logger.info("task finished immediately")
+ return
+
+ res = self.jsonBody()
+ self.assertIsInstance(res, dict)
+ self.assertIn('name', res)
+ self.assertIn('metadata', res)
+
+ task_name = res['name']
+ task_metadata = res['metadata']
+
+ # pylint: disable=protected-access
+ class Waiter(threading.Thread):
+ def __init__(self, task_name, task_metadata, tc):
+ super(Waiter, self).__init__()
+ self.task_name = task_name
+ self.task_metadata = task_metadata
+ self.ev = threading.Event()
+ self.abort = False
+ self.res_task = None
+ self.tc = tc
+
+ def run(self):
+ running = True
+ while running and not self.abort:
+ logger.info("task (%s, %s) is still executing", self.task_name,
+ self.task_metadata)
+ time.sleep(1)
+ self.tc._get('/api/task?name={}'.format(self.task_name))
+ res = self.tc.jsonBody()
+ for task in res['finished_tasks']:
+ if task['metadata'] == self.task_metadata:
+ # task finished
+ running = False
+ self.res_task = task
+ self.ev.set()
+
+ thread = Waiter(task_name, task_metadata, self)
+ thread.start()
+ status = thread.ev.wait(timeout)
+ if not status:
+ # timeout expired
+ thread.abort = True
+ thread.join()
+ raise Exception("Waiting for task ({}, {}) to finish timed out"
+ .format(task_name, task_metadata))
+ logger.info("task (%s, %s) finished", task_name, task_metadata)
+ if thread.res_task['success']:
+ self.body = json.dumps(thread.res_task['ret_value'])
+ if method == 'POST':
+ self.status = '201 Created'
+ elif method == 'PUT':
+ self.status = '200 OK'
+ elif method == 'DELETE':
+ self.status = '204 No Content'
+ return
+ else:
+ if 'status' in thread.res_task['exception']:
+ self.status = thread.res_task['exception']['status']
+ else:
+ self.status = 500
+ self.body = json.dumps(thread.res_task['exception'])
+ return
+
+ def _task_post(self, url, data=None, timeout=60):
+ self._task_request('POST', url, data, timeout)
+
+ def _task_delete(self, url, timeout=60):
+ self._task_request('DELETE', url, None, timeout)
+
+ def _task_put(self, url, data=None, timeout=60):
+ self._task_request('PUT', url, data, timeout)
+
+ def jsonBody(self):
+ body_str = self.body.decode('utf-8') if isinstance(self.body, bytes) else self.body
+ return json.loads(body_str)
+
+ def assertJsonBody(self, data, msg=None):
+ """Fail if value != self.body."""
+ json_body = self.jsonBody()
+ if data != json_body:
+ if msg is None:
+ msg = 'expected body:\n%r\n\nactual body:\n%r' % (
+ data, json_body)
+ self._handlewebError(msg)
+
+ def assertInJsonBody(self, data, msg=None):
+ json_body = self.jsonBody()
+ if data not in json_body:
+ if msg is None:
+ msg = 'expected %r to be in %r' % (data, json_body)
+ self._handlewebError(msg)
+++ /dev/null
-# -*- coding: utf-8 -*-
-# pylint: disable=too-many-arguments
-from __future__ import absolute_import
-
-import json
-import threading
-import time
-
-import cherrypy
-from cherrypy._cptools import HandlerWrapperTool
-from cherrypy.test import helper
-
-from . import exec_dashboard_cmd
-from .. import logger, mgr
-from ..controllers import json_error_page, generate_controller_routes
-from ..services.auth import AuthManagerTool
-from ..services.exception import dashboard_exception_handler
-
-
-class KVStoreMockMixin(object):
- CONFIG_KEY_DICT = {}
-
- @classmethod
- def mock_set_module_option(cls, attr, val):
- cls.CONFIG_KEY_DICT[attr] = val
-
- @classmethod
- def mock_get_module_option(cls, attr, default=None):
- return cls.CONFIG_KEY_DICT.get(attr, default)
-
- @classmethod
- def mock_kv_store(cls):
- cls.CONFIG_KEY_DICT.clear()
- mgr.set_module_option.side_effect = cls.mock_set_module_option
- mgr.get_module_option.side_effect = cls.mock_get_module_option
- # kludge below
- mgr.set_store.side_effect = cls.mock_set_module_option
- mgr.get_store.side_effect = cls.mock_get_module_option
-
- @classmethod
- def get_key(cls, key):
- return cls.CONFIG_KEY_DICT.get(key, None)
-
-
-class CLICommandTestMixin(KVStoreMockMixin):
- @classmethod
- def exec_cmd(cls, cmd, **kwargs):
- return exec_dashboard_cmd(None, cmd, **kwargs)
-
-
-class ControllerTestCase(helper.CPWebCase):
- @classmethod
- def setup_controllers(cls, ctrl_classes, base_url=''):
- if not isinstance(ctrl_classes, list):
- ctrl_classes = [ctrl_classes]
- mapper = cherrypy.dispatch.RoutesDispatcher()
- endpoint_list = []
- for ctrl in ctrl_classes:
- inst = ctrl()
- for endpoint in ctrl.endpoints():
- endpoint.inst = inst
- endpoint_list.append(endpoint)
- endpoint_list = sorted(endpoint_list, key=lambda e: e.url)
- for endpoint in endpoint_list:
- generate_controller_routes(endpoint, mapper, base_url)
- if base_url == '':
- base_url = '/'
- cherrypy.tree.mount(None, config={
- base_url: {'request.dispatch': mapper}})
-
- def __init__(self, *args, **kwargs):
- cherrypy.tools.authenticate = AuthManagerTool()
- cherrypy.tools.dashboard_exception_handler = HandlerWrapperTool(dashboard_exception_handler,
- priority=31)
- cherrypy.config.update({
- 'error_page.default': json_error_page,
- 'tools.json_in.on': True,
- 'tools.json_in.force': False
- })
- super(ControllerTestCase, self).__init__(*args, **kwargs)
-
- def _request(self, url, method, data=None):
- if not data:
- b = None
- h = None
- else:
- b = json.dumps(data)
- h = [('Content-Type', 'application/json'),
- ('Content-Length', str(len(b)))]
- self.getPage(url, method=method, body=b, headers=h)
-
- def _get(self, url):
- self._request(url, 'GET')
-
- def _post(self, url, data=None):
- self._request(url, 'POST', data)
-
- def _delete(self, url, data=None):
- self._request(url, 'DELETE', data)
-
- def _put(self, url, data=None):
- self._request(url, 'PUT', data)
-
- def _task_request(self, method, url, data, timeout):
- self._request(url, method, data)
- if self.status != '202 Accepted':
- logger.info("task finished immediately")
- return
-
- res = self.jsonBody()
- self.assertIsInstance(res, dict)
- self.assertIn('name', res)
- self.assertIn('metadata', res)
-
- task_name = res['name']
- task_metadata = res['metadata']
-
- # pylint: disable=protected-access
- class Waiter(threading.Thread):
- def __init__(self, task_name, task_metadata, tc):
- super(Waiter, self).__init__()
- self.task_name = task_name
- self.task_metadata = task_metadata
- self.ev = threading.Event()
- self.abort = False
- self.res_task = None
- self.tc = tc
-
- def run(self):
- running = True
- while running and not self.abort:
- logger.info("task (%s, %s) is still executing", self.task_name,
- self.task_metadata)
- time.sleep(1)
- self.tc._get('/api/task?name={}'.format(self.task_name))
- res = self.tc.jsonBody()
- for task in res['finished_tasks']:
- if task['metadata'] == self.task_metadata:
- # task finished
- running = False
- self.res_task = task
- self.ev.set()
-
- thread = Waiter(task_name, task_metadata, self)
- thread.start()
- status = thread.ev.wait(timeout)
- if not status:
- # timeout expired
- thread.abort = True
- thread.join()
- raise Exception("Waiting for task ({}, {}) to finish timed out"
- .format(task_name, task_metadata))
- logger.info("task (%s, %s) finished", task_name, task_metadata)
- if thread.res_task['success']:
- self.body = json.dumps(thread.res_task['ret_value'])
- if method == 'POST':
- self.status = '201 Created'
- elif method == 'PUT':
- self.status = '200 OK'
- elif method == 'DELETE':
- self.status = '204 No Content'
- return
- else:
- if 'status' in thread.res_task['exception']:
- self.status = thread.res_task['exception']['status']
- else:
- self.status = 500
- self.body = json.dumps(thread.res_task['exception'])
- return
-
- def _task_post(self, url, data=None, timeout=60):
- self._task_request('POST', url, data, timeout)
-
- def _task_delete(self, url, timeout=60):
- self._task_request('DELETE', url, None, timeout)
-
- def _task_put(self, url, data=None, timeout=60):
- self._task_request('PUT', url, data, timeout)
-
- def jsonBody(self):
- body_str = self.body.decode('utf-8') if isinstance(self.body, bytes) else self.body
- return json.loads(body_str)
-
- def assertJsonBody(self, data, msg=None):
- """Fail if value != self.body."""
- json_body = self.jsonBody()
- if data != json_body:
- if msg is None:
- msg = 'expected body:\n%r\n\nactual body:\n%r' % (
- data, json_body)
- self._handlewebError(msg)
-
- def assertInJsonBody(self, data, msg=None):
- json_body = self.jsonBody()
- if data not in json_body:
- if msg is None:
- msg = 'expected %r to be in %r' % (data, json_body)
- self._handlewebError(msg)
import time
import unittest
-from . import CmdException
-from .helper import CLICommandTestMixin
+from . import CmdException, CLICommandTestMixin
from .. import mgr
from ..security import Scope, Permission
from ..services.access_control import load_access_control_db, \
import cherrypy
import mock
-from .helper import ControllerTestCase, KVStoreMockMixin
+from . import ControllerTestCase, KVStoreMockMixin
from ..controllers import RESTController, Controller
from ..tools import RequestLoggingTool
from .. import mgr
# -*- coding: utf-8 -*-
from __future__ import absolute_import
-from .helper import ControllerTestCase
+from . import ControllerTestCase
from ..controllers import BaseController, RESTController, Controller, \
ApiController, Endpoint
# # -*- coding: utf-8 -*-
from __future__ import absolute_import
-from .helper import ControllerTestCase
+from . import ControllerTestCase
from ..controllers import RESTController, ApiController, Endpoint, EndpointDoc, ControllerDoc
from ..controllers.docs import Docs
# -*- coding: utf-8 -*-
from .. import mgr
-from .helper import ControllerTestCase
+from . import ControllerTestCase
from ..controllers.erasure_code_profile import ErasureCodeProfile
import time
import rados
+
+from . import ControllerTestCase
from ..services.ceph_service import SendCommandError
from ..controllers import RESTController, Controller, Task, Endpoint
-from .helper import ControllerTestCase
from ..services.exception import handle_rados_error, handle_send_command_error, \
serialize_dashboard_exception
from ..tools import ViewCache, TaskManager, NotificationQueue
import unittest
from mock import Mock, patch
-from .helper import KVStoreMockMixin
+from . import KVStoreMockMixin
from ..plugins.feature_toggles import FeatureToggles, Features
from mock import MagicMock, Mock
-from .helper import KVStoreMockMixin
+from . import KVStoreMockMixin
from .. import mgr
from ..settings import Settings
from ..services import ganesha
-from .helper import ControllerTestCase
+from . import ControllerTestCase
from ..controllers.grafana import Grafana
from .. import mgr
import json
import mock
-from . import CmdException
-from .helper import ControllerTestCase, CLICommandTestMixin
+from . import CmdException, ControllerTestCase, CLICommandTestMixin
from .. import mgr
from ..controllers.iscsi import Iscsi, IscsiTarget
from ..services.iscsi_client import IscsiClient
# -*- coding: utf-8 -*-
+from . import ControllerTestCase
from .. import mgr
from ..controllers import BaseController, Controller
from ..controllers.prometheus import Prometheus, PrometheusReceiver
-from .helper import ControllerTestCase
-
@Controller('alertmanager/mocked/api/v1/alerts', secure=False)
class AlertManagerMockInstance(BaseController):
import json
import mock
+from . import ControllerTestCase
from .. import mgr
from ..controllers.summary import Summary
from ..controllers.rbd_mirroring import RbdMirroringSummary
-from .helper import ControllerTestCase
mock_list_servers = [{
import time
-from .helper import ControllerTestCase
+from . import ControllerTestCase
from ..controllers import Controller, RESTController, Task
from ..controllers.task import Task as TaskController
from ..tools import NotificationQueue, TaskManager
import mock
-from .helper import ControllerTestCase
+from . import ControllerTestCase
from ..controllers.rgw import RgwUser
import errno
import unittest
-from .helper import KVStoreMockMixin
+from . import KVStoreMockMixin, ControllerTestCase
from .. import settings
from ..controllers.settings import Settings as SettingsController
from ..settings import Settings, handle_option_command
-from .helper import ControllerTestCase
class SettingsTest(unittest.TestCase, KVStoreMockMixin):
import errno
import unittest
-from . import CmdException, exec_dashboard_cmd
-from .helper import CLICommandTestMixin
+from . import CmdException, exec_dashboard_cmd, KVStoreMockMixin
from ..services.sso import handle_sso_command, load_sso_db
-class AccessControlTest(unittest.TestCase, CLICommandTestMixin):
+class AccessControlTest(unittest.TestCase, KVStoreMockMixin):
IDP_METADATA = '''<?xml version="1.0"?>
<md:EntityDescriptor xmlns:md="urn:oasis:names:tc:SAML:2.0:metadata"
xmlns:ds="http://www.w3.org/2000/09/xmldsig#"
from __future__ import absolute_import
+from . import ControllerTestCase
from .. import mgr
from ..controllers.tcmu_iscsi import TcmuIscsi
-from .helper import ControllerTestCase
mocked_servers = [{
'ceph_version': 'ceph version 13.0.0-5083- () mimic (dev)',
from cherrypy.lib.sessions import RamSession
from mock import patch
+from . import ControllerTestCase
from ..services.exception import handle_rados_error
-from .helper import ControllerTestCase
from ..controllers import RESTController, ApiController, Controller, \
BaseController, Proxy
from ..tools import is_valid_ipv6_address, dict_contains_path, \