# -*- coding: utf-8 -*-
-# pylint: disable=W0212,too-many-return-statements
+# pylint: disable=W0212,too-many-return-statements,too-many-public-methods
from __future__ import absolute_import
import json
import requests
from tasks.mgr.mgr_test_case import MgrTestCase
-from teuthology.exceptions import CommandFailedError
+from teuthology.exceptions import \
+ CommandFailedError # pylint: disable=import-error
log = logging.getLogger(__name__)
REQUIRE_FILESYSTEM = True
CLIENTS_REQUIRED = 1
CEPHFS = False
+ ORCHESTRATOR = False
+ ORCHESTRATOR_TEST_DATA = {
+ 'inventory': [
+ {
+ 'name': 'test-host0',
+ 'addr': '1.2.3.4',
+ 'devices': [
+ {
+ 'path': '/dev/sda',
+ }
+ ]
+ },
+ {
+ 'name': 'test-host1',
+ 'addr': '1.2.3.5',
+ 'devices': [
+ {
+ 'path': '/dev/sdb',
+ }
+ ]
+ }
+ ],
+ 'daemons': [
+ {
+ 'nodename': 'test-host0',
+ 'daemon_type': 'mon',
+ 'daemon_id': 'a'
+ },
+ {
+ 'nodename': 'test-host0',
+ 'daemon_type': 'mgr',
+ 'daemon_id': 'x'
+ },
+ {
+ 'nodename': 'test-host0',
+ 'daemon_type': 'osd',
+ 'daemon_id': '0'
+ },
+ {
+ 'nodename': 'test-host1',
+ 'daemon_type': 'osd',
+ 'daemon_id': '1'
+ }
+ ]
+ }
_session = None # type: requests.sessions.Session
_token = None
@classmethod
def create_user(cls, username, password, roles=None,
force_password=True, cmd_args=None):
+ # pylint: disable=too-many-arguments
"""
:param username: The name of the user.
:type username: str
set_roles_args.append(rolename)
cls._ceph_cmd(set_roles_args)
+ @classmethod
+ def create_pool(cls, name, pg_num, pool_type, application='rbd'):
+ data = {
+ 'pool': name,
+ 'pg_num': pg_num,
+ 'pool_type': pool_type,
+ 'application_metadata': [application]
+ }
+ if pool_type == 'erasure':
+ data['flags'] = ['ec_overwrites']
+ cls._task_post("/api/pool", data)
+
@classmethod
def login(cls, username, password):
if cls._loggedin:
@classmethod
def RunAs(cls, username, password, roles=None, force_password=True,
cmd_args=None, login=True):
+ # pylint: disable=too-many-arguments
def wrapper(func):
def execute(self, *args, **kwargs):
self.create_user(username, password, roles,
# wait for mds restart to complete...
cls.fs.wait_for_daemons()
+ if cls.ORCHESTRATOR:
+ cls._load_module("test_orchestrator")
+
+ cmd = ['orch', 'set', 'backend', 'test_orchestrator']
+ cls.mgr_cluster.mon_manager.raw_cluster_cmd(*cmd)
+
+ cmd = ['test_orchestrator', 'load_data', '-i', '-']
+ cls.mgr_cluster.mon_manager.raw_cluster_cmd_result(*cmd, stdin=json.dumps(
+ cls.ORCHESTRATOR_TEST_DATA
+ ))
+
cls._token = None
cls._session = requests.Session()
cls._resp = None
elif method == 'DELETE':
cls._resp.status_code = 204
return res_task['ret_value']
+
+ if 'status' in res_task['exception']:
+ cls._resp.status_code = res_task['exception']['status']
else:
- if 'status' in res_task['exception']:
- cls._resp.status_code = res_task['exception']['status']
- else:
- cls._resp.status_code = 500
- return res_task['exception']
+ cls._resp.status_code = 500
+ return res_task['exception']
@classmethod
def _task_post(cls, url, data=None, timeout=60):
JAny = namedtuple('JAny', ['none'])
+module_options_object_schema = JObj({
+ 'name': str,
+ 'type': str,
+ 'level': str,
+ 'flags': int,
+ 'default_value': JAny(none=True),
+ 'min': JAny(none=False),
+ 'max': JAny(none=False),
+ 'enum_allowed': JList(str),
+ 'see_also': JList(str),
+ 'desc': str,
+ 'long_desc': str,
+ 'tags': JList(str),
+})
+
+module_options_schema = JObj(
+ {},
+ allow_unknown=True,
+ unknown_schema=module_options_object_schema)
+
+addrvec_schema = JList(JObj({
+ 'addr': str,
+ 'nonce': int,
+ 'type': str
+}))
+
+devices_schema = JList(JObj({
+ 'daemons': JList(str),
+ 'devid': str,
+ 'location': JList(JObj({
+ 'host': str,
+ 'dev': str,
+ 'path': str
+ }))
+}))
+
class _ValError(Exception):
def __init__(self, msg, path):
super(_ValError, self).__init__('In `input{}`: {}'.format(path_str, msg))
-# pylint: disable=dangerous-default-value,inconsistent-return-statements
+# pylint: disable=dangerous-default-value,inconsistent-return-statements,too-many-branches
def _validate_json(val, schema, path=[]):
"""
>>> d = {'a': 1, 'b': 'x', 'c': range(10)}
if isinstance(schema, JObj):
if val is None and schema.none:
return True
- elif val is None:
+ if val is None:
raise _ValError('val is None', path)
if not hasattr(val, 'keys'):
raise _ValError('val="{}" is not a dict'.format(val), path)
# -*- coding: utf-8 -*-
+# pylint: disable=too-many-public-methods
+
from contextlib import contextmanager
from .helper import DashboardTestCase, JLeaf, JList, JObj
CEPHFS = True
AUTH_ROLES = ['pool-manager', 'ganesha-manager']
- @classmethod
- def create_pool(cls, name, pg_num, pool_type, application='rbd'):
- data = {
- 'pool': name,
- 'pg_num': pg_num,
- 'pool_type': pool_type,
- 'application_metadata': [application]
- }
- if pool_type == 'erasure':
- data['flags'] = ['ec_overwrites']
- cls._task_post("/api/pool", data)
-
@classmethod
def setUpClass(cls):
super(GaneshaTest, cls).setUpClass()
# -*- coding: utf-8 -*-
from __future__ import absolute_import
-from .helper import DashboardTestCase, JAny, JLeaf, JList, JObj
+from .helper import (DashboardTestCase, JAny, JLeaf, JList, JObj,
+ addrvec_schema, module_options_schema)
class HealthTest(DashboardTestCase):
'can_run': bool,
'error_string': str,
'name': str,
- 'module_options': JObj(
- {},
- allow_unknown=True,
- unknown_schema=JObj({
- 'name': str,
- 'type': str,
- 'level': str,
- 'flags': int,
- 'default_value': str,
- 'min': str,
- 'max': str,
- 'enum_allowed': JList(str),
- 'see_also': JList(str),
- 'desc': str,
- 'long_desc': str,
- 'tags': JList(str),
- })),
+ 'module_options': module_options_schema
})
schema = JObj({
'client_perf': JObj({
'mgr_map': JObj({
'active_addr': str,
'active_addrs': JObj({
- 'addrvec': JList(JObj({
- 'addr': str,
- 'nonce': int,
- 'type': str
- }))
+ 'addrvec': addrvec_schema
}),
'active_change': str, # timestamp
'active_mgr_features': int,
'required_mon': JList(str)
}),
'monmap': JObj({
- # TODO: expand on monmap schema
+ # @TODO: expand on monmap schema
'mons': JList(JLeaf(dict)),
}, allow_unknown=True),
'name': str,
'quorum_age': int,
'rank': int,
'state': str,
- # TODO: What type should be expected here?
+ # @TODO: What type should be expected here?
'sync_provider': JList(JAny(none=True))
}),
'osd_map': JObj({
- # TODO: define schema for crush map and osd_metadata, among
+ # @TODO: define schema for crush map and osd_metadata, among
# others
'osds': JList(
JObj({
# -*- coding: utf-8 -*-
from __future__ import absolute_import
-import json
-
-from .helper import DashboardTestCase, JList, JObj
-from .test_orchestrator import test_data
+from .helper import DashboardTestCase, JList, JObj, devices_schema
class HostControllerTest(DashboardTestCase):
URL_HOST = '/api/host'
+ ORCHESTRATOR = True
+
@classmethod
def setUpClass(cls):
super(HostControllerTest, cls).setUpClass()
- cls._load_module("test_orchestrator")
-
- cmd = ['orch', 'set', 'backend', 'test_orchestrator']
- cls.mgr_cluster.mon_manager.raw_cluster_cmd(*cmd)
-
- cmd = ['test_orchestrator', 'load_data', '-i', '-']
- cls.mgr_cluster.mon_manager.raw_cluster_cmd_result(*cmd, stdin=json.dumps(test_data))
@classmethod
def tearDownClass(cls):
data = self._get(self.URL_HOST)
self.assertStatus(200)
- orch_hostnames = {inventory_node['name'] for inventory_node in test_data['inventory']}
+ orch_hostnames = {inventory_node['name'] for inventory_node in
+ self.ORCHESTRATOR_TEST_DATA['inventory']}
for server in data:
self.assertIn('services', server)
def test_host_list_with_sources(self):
data = self._get('{}?sources=orchestrator'.format(self.URL_HOST))
self.assertStatus(200)
- test_hostnames = {inventory_node['name'] for inventory_node in test_data['inventory']}
+ test_hostnames = {inventory_node['name'] for inventory_node in
+ self.ORCHESTRATOR_TEST_DATA['inventory']}
resp_hostnames = {host['hostname'] for host in data}
self.assertEqual(test_hostnames, resp_hostnames)
data = self._get('{}?sources=ceph'.format(self.URL_HOST))
self.assertStatus(200)
- test_hostnames = {inventory_node['name'] for inventory_node in test_data['inventory']}
+ test_hostnames = {inventory_node['name'] for inventory_node in
+ self.ORCHESTRATOR_TEST_DATA['inventory']}
resp_hostnames = {host['hostname'] for host in data}
self.assertEqual(len(test_hostnames.intersection(resp_hostnames)), 0)
assert hosts[0]
data = self._get('{}/devices'.format('{}/{}'.format(self.URL_HOST, hosts[0])))
self.assertStatus(200)
- self.assertSchema(data, JList(JObj({
- 'daemons': JList(str),
- 'devid': str,
- 'location': JList(JObj({
- 'host': str,
- 'dev': str,
- 'path': str
- }))
- })))
+ self.assertSchema(data, devices_schema)
def test_host_daemons(self):
hosts = self._get('{}'.format(self.URL_HOST))
# -*- coding: utf-8 -*-
from __future__ import absolute_import
-from .helper import DashboardTestCase, JList, JObj
+from .helper import DashboardTestCase, JList, JObj, addrvec_schema
class LogsTest(DashboardTestCase):
self.assertStatus(200)
log_entry_schema = JList(JObj({
'addrs': JObj({
- 'addrvec': JList(JObj({
- 'addr': str,
- 'nonce': int,
- 'type': str
- }))
+ 'addrvec': addrvec_schema
}),
'channel': str,
'message': str,
import requests
-from .helper import DashboardTestCase, JAny, JLeaf, JList, JObj
+from .helper import (DashboardTestCase, JLeaf, JList, JObj,
+ module_options_object_schema, module_options_schema)
logger = logging.getLogger(__name__)
class MgrModuleTest(MgrModuleTestCase):
- __options_schema = JObj({
- 'name': str,
- 'type': str,
- 'level': str,
- 'flags': int,
- 'default_value': JAny(none=True),
- 'min': JAny(none=False),
- 'max': JAny(none=False),
- 'enum_allowed': JList(str),
- 'desc': str,
- 'long_desc': str,
- 'tags': JList(str),
- 'see_also': JList(str)
- })
-
def test_list_disabled_module(self):
self._ceph_cmd(['mgr', 'module', 'disable', 'iostat'])
self.wait_until_rest_api_accessible()
'name': JLeaf(str),
'enabled': JLeaf(bool),
'always_on': JLeaf(bool),
- 'options': JObj(
- {},
- allow_unknown=True,
- unknown_schema=JObj({
- 'name': str,
- 'type': str,
- 'level': str,
- 'flags': int,
- 'default_value': JAny(none=True),
- 'min': JAny(none=False),
- 'max': JAny(none=False),
- 'enum_allowed': JList(str),
- 'see_also': JList(str),
- 'desc': str,
- 'long_desc': str,
- 'tags': JList(str)
- }))
+ 'options': module_options_schema
})))
module_info = self.find_object_in_list('name', 'iostat', data)
self.assertIsNotNone(module_info)
'name': JLeaf(str),
'enabled': JLeaf(bool),
'always_on': JLeaf(bool),
- 'options': JObj(
- {},
- allow_unknown=True,
- unknown_schema=JObj({
- 'name': str,
- 'type': str,
- 'level': str,
- 'flags': int,
- 'default_value': JAny(none=True),
- 'min': JAny(none=False),
- 'max': JAny(none=False),
- 'enum_allowed': JList(str),
- 'see_also': JList(str),
- 'desc': str,
- 'long_desc': str,
- 'tags': JList(str)
- }))
+ 'options': module_options_schema
})))
module_info = self.find_object_in_list('name', 'iostat', data)
self.assertIsNotNone(module_info)
data = self._get('/api/mgr/module/telemetry/options')
self.assertStatus(200)
schema = JObj({
- 'channel_basic': self.__options_schema,
- 'channel_crash': self.__options_schema,
- 'channel_device': self.__options_schema,
- 'channel_ident': self.__options_schema,
- 'contact': self.__options_schema,
- 'description': self.__options_schema,
- 'device_url': self.__options_schema,
- 'enabled': self.__options_schema,
- 'interval': self.__options_schema,
- 'last_opt_revision': self.__options_schema,
- 'leaderboard': self.__options_schema,
- 'log_level': self.__options_schema,
- 'log_to_cluster': self.__options_schema,
- 'log_to_cluster_level': self.__options_schema,
- 'log_to_file': self.__options_schema,
- 'organization': self.__options_schema,
- 'proxy': self.__options_schema,
- 'url': self.__options_schema
+ 'channel_basic': module_options_object_schema,
+ 'channel_crash': module_options_object_schema,
+ 'channel_device': module_options_object_schema,
+ 'channel_ident': module_options_object_schema,
+ 'contact': module_options_object_schema,
+ 'description': module_options_object_schema,
+ 'device_url': module_options_object_schema,
+ 'enabled': module_options_object_schema,
+ 'interval': module_options_object_schema,
+ 'last_opt_revision': module_options_object_schema,
+ 'leaderboard': module_options_object_schema,
+ 'log_level': module_options_object_schema,
+ 'log_to_cluster': module_options_object_schema,
+ 'log_to_cluster_level': module_options_object_schema,
+ 'log_to_file': module_options_object_schema,
+ 'organization': module_options_object_schema,
+ 'proxy': module_options_object_schema,
+ 'url': module_options_object_schema
})
self.assertSchema(data, schema)
# -*- coding: utf-8 -*-
from __future__ import absolute_import
-import json
-
from .helper import DashboardTestCase
-test_data = {
- 'inventory': [
- {
- 'name': 'test-host0',
- 'addr': '1.2.3.4',
- 'devices': [
- {
- 'path': '/dev/sda',
- }
- ]
- },
- {
- 'name': 'test-host1',
- 'addr': '1.2.3.5',
- 'devices': [
- {
- 'path': '/dev/sdb',
- }
- ]
- }
- ],
- 'daemons': [
- {
- 'nodename': 'test-host0',
- 'daemon_type': 'mon',
- 'daemon_id': 'a'
- },
- {
- 'nodename': 'test-host0',
- 'daemon_type': 'mgr',
- 'daemon_id': 'x'
- },
- {
- 'nodename': 'test-host0',
- 'daemon_type': 'osd',
- 'daemon_id': '0'
- },
- {
- 'nodename': 'test-host1',
- 'daemon_type': 'osd',
- 'daemon_id': '1'
- }
- ]
-}
-
class OrchestratorControllerTest(DashboardTestCase):
URL_INVENTORY = '/api/orchestrator/inventory'
URL_OSD = '/api/orchestrator/osd'
+ ORCHESTRATOR = True
+
@property
def test_data_inventory(self):
- return test_data['inventory']
+ return self.ORCHESTRATOR_TEST_DATA['inventory']
@property
def test_data_daemons(self):
- return test_data['daemons']
+ return self.ORCHESTRATOR_TEST_DATA['daemons']
@classmethod
def setUpClass(cls):
super(OrchestratorControllerTest, cls).setUpClass()
- cls._load_module('test_orchestrator')
- cmd = ['orch', 'set', 'backend', 'test_orchestrator']
- cls.mgr_cluster.mon_manager.raw_cluster_cmd(*cmd)
-
- cmd = ['test_orchestrator', 'load_data', '-i', '-']
- cls.mgr_cluster.mon_manager.raw_cluster_cmd_result(*cmd, stdin=json.dumps(test_data))
@classmethod
def tearDownClass(cls):
import json
-from .helper import DashboardTestCase, JAny, JLeaf, JList, JObj, JTuple
+from .helper import (DashboardTestCase, JAny, JLeaf, JList, JObj, JTuple,
+ devices_schema)
class OsdTest(DashboardTestCase):
data = self._get('/api/osd/safe_to_delete?svc_ids=0')
self.assertStatus(200)
self.assertSchema(data, JObj({
- 'is_safe_to_delete': JAny(none=True),
- 'message': str
- }))
+ 'is_safe_to_delete': JAny(none=True),
+ 'message': str
+ }))
self.assertTrue(data['is_safe_to_delete'])
def test_osd_smart(self):
response = self.jsonBody()
if 'osd_map' in response and 'weight' in response['osd_map']:
return round(response['osd_map']['weight'], 1)
+ return None
self.wait_until_equal(get_reweight_value, 0.4, 10)
self.assertStatus(200)
self._post('/api/osd/0/reweight', {'weight': 1})
def test_create_lost_destroy_remove(self):
+ sample_data = {
+ 'uuid': 'f860ca2e-757d-48ce-b74a-87052cad563f',
+ 'svc_id': 5
+ }
+
# Create
self._task_post('/api/osd', {
'method': 'bare',
- 'data': {
- 'uuid': 'f860ca2e-757d-48ce-b74a-87052cad563f',
- 'svc_id': 5
- },
+ 'data': sample_data,
'tracking_id': 'bare-5'
})
self.assertStatus(201)
def test_osd_devices(self):
data = self._get('/api/osd/0/devices')
self.assertStatus(200)
- self.assertSchema(data, JList(JObj({
- 'daemons': JList(str),
- 'devid': str,
- 'location': JList(JObj({
- 'host': str,
- 'dev': str,
- 'path': str
- }))
- })))
+ self.assertSchema(data, devices_schema)
class OsdFlagsTest(DashboardTestCase):
self.assertStatus(204)
def _validate_pool_properties(self, data, pool, timeout=DashboardTestCase.TIMEOUT_HEALTH_CLEAR):
+ # pylint: disable=too-many-branches
for prop, value in data.items():
if prop == 'pool_type':
self.assertEqual(pool['type'], value)
data = self._get('/api/pool/{}/configuration'.format(pool_name))
self.assertStatus(200)
self.assertSchema(data, JList(JObj({
- 'name': str,
- 'value': str,
- 'source': int
- })))
+ 'name': str,
+ 'value': str,
+ 'source': int
+ })))
def test_pool_list(self):
data = self._get("/api/pool")
class RbdTest(DashboardTestCase):
AUTH_ROLES = ['pool-manager', 'block-manager', 'cluster-manager']
- @classmethod
- def create_pool(cls, name, pg_num, pool_type, application='rbd'):
- data = {
- 'pool': name,
- 'pg_num': pg_num,
- 'pool_type': pool_type,
- 'application_metadata': [application]
- }
- if pool_type == 'erasure':
- data['flags'] = ['ec_overwrites']
- cls._task_post("/api/pool", data)
-
@DashboardTestCase.RunAs('test', 'test', [{'rbd-image': ['create', 'update', 'delete']}])
def test_read_access_permissions(self):
self._get('/api/block/image')
def get_trash(cls, pool, image_id):
trash = cls._get('/api/block/image/trash/?pool_name={}'.format(pool))
if isinstance(trash, list):
- for pool in trash:
- for image in pool['value']:
+ for trash_pool in trash:
+ for image in trash_pool['value']:
if image['id'] == image_id:
return image
self.remove_namespace('rbd', 'ns')
def test_move_image_to_trash(self):
- id = self.create_image_in_trash('rbd', 'test_rbd')
+ img_id = self.create_image_in_trash('rbd', 'test_rbd')
self.get_image('rbd', None, 'test_rbd')
self.assertStatus(404)
time.sleep(1)
- image = self.get_trash('rbd', id)
+ image = self.get_trash('rbd', img_id)
self.assertIsNotNone(image)
- self.remove_trash('rbd', id)
+ self.remove_trash('rbd', img_id)
def test_list_trash(self):
- id = self.create_image_in_trash('rbd', 'test_rbd', 0)
+ img_id = self.create_image_in_trash('rbd', 'test_rbd', 0)
data = self._get('/api/block/image/trash/?pool_name={}'.format('rbd'))
self.assertStatus(200)
self.assertIsInstance(data, list)
self.assertIsNotNone(data)
- self.remove_trash('rbd', id)
+ self.remove_trash('rbd', img_id)
self.assertStatus(204)
def test_restore_trash(self):
- id = self.create_image_in_trash('rbd', 'test_rbd')
+ img_id = self.create_image_in_trash('rbd', 'test_rbd')
- self.restore_trash('rbd', None, id, 'test_rbd')
+ self.restore_trash('rbd', None, img_id, 'test_rbd')
self.get_image('rbd', None, 'test_rbd')
self.assertStatus(200)
- image = self.get_trash('rbd', id)
+ image = self.get_trash('rbd', img_id)
self.assertIsNone(image)
self.remove_image('rbd', None, 'test_rbd')
def test_remove_expired_trash(self):
- id = self.create_image_in_trash('rbd', 'test_rbd', 0)
- self.remove_trash('rbd', id, False)
+ img_id = self.create_image_in_trash('rbd', 'test_rbd', 0)
+ self.remove_trash('rbd', img_id, False)
self.assertStatus(204)
- image = self.get_trash('rbd', id)
+ image = self.get_trash('rbd', img_id)
self.assertIsNone(image)
def test_remove_not_expired_trash(self):
- id = self.create_image_in_trash('rbd', 'test_rbd', 9999)
- self.remove_trash('rbd', id, False)
+ img_id = self.create_image_in_trash('rbd', 'test_rbd', 9999)
+ self.remove_trash('rbd', img_id, False)
self.assertStatus(400)
time.sleep(1)
- image = self.get_trash('rbd', id)
+ image = self.get_trash('rbd', img_id)
self.assertIsNotNone(image)
- self.remove_trash('rbd', id, True)
+ self.remove_trash('rbd', img_id, True)
def test_remove_not_expired_trash_with_force(self):
- id = self.create_image_in_trash('rbd', 'test_rbd', 9999)
- self.remove_trash('rbd', id, True)
+ img_id = self.create_image_in_trash('rbd', 'test_rbd', 9999)
+ self.remove_trash('rbd', img_id, True)
self.assertStatus(204)
- image = self.get_trash('rbd', id)
+ image = self.get_trash('rbd', img_id)
self.assertIsNone(image)
def test_purge_trash(self):
class RbdMirroringTest(DashboardTestCase):
AUTH_ROLES = ['pool-manager', 'block-manager']
- @classmethod
- def create_pool(cls, name, application='rbd'):
- data = {
- 'pool': name,
- 'pg_num': 2**3,
- 'pool_type': 'replicated',
- 'application_metadata': [application]
- }
- cls._task_post("/api/pool", data)
-
@classmethod
def get_pool(cls, pool):
data = cls._get('/api/block/mirroring/pool/{}'.format(pool))
@classmethod
def setUpClass(cls):
super(RbdMirroringTest, cls).setUpClass()
- cls.create_pool('rbd')
+ cls.create_pool('rbd', 2**3, 'replicated')
@classmethod
def tearDownClass(cls):
'lock_retention_period_days': JLeaf(int),
'lock_retention_period_years': JLeaf(int)
},
- allow_unknown=True))
+ allow_unknown=True))
self.assertTrue(data['lock_enabled'])
self.assertEqual(data['lock_mode'], 'GOVERNANCE')
self.assertEqual(data['lock_retention_period_days'], 0)
# -*- coding: utf-8 -*-
+# pylint: disable=too-many-public-methods
from __future__ import absolute_import
with mock.patch.object(mgr, 'get_latest', return_value=1146609664):
yield
+ def _get_drive_group_data(self, service_id='all_hdd', host_pattern_k='host_pattern',
+ host_pattern_v='*'):
+ return {
+ 'method': 'drive_groups',
+ 'data': [
+ {
+ 'service_type': 'osd',
+ 'service_id': service_id,
+ 'data_devices': {
+ 'rotational': True
+ },
+ host_pattern_k: host_pattern_v
+ }
+ ],
+ 'tracking_id': 'all_hdd, b_ssd'
+ }
+
def test_osd_list_aggregation(self):
"""
This test emulates the state of a cluster where an OSD has only been
@mock.patch('dashboard.controllers.osd.CephService')
def test_osd_create_bare(self, ceph_service):
ceph_service.send_command.return_value = '5'
+ sample_data = {
+ 'uuid': 'f860ca2e-757d-48ce-b74a-87052cad563f',
+ 'svc_id': 5
+ }
+
data = {
'method': 'bare',
- 'data': {
- 'uuid': 'f860ca2e-757d-48ce-b74a-87052cad563f',
- 'svc_id': 5
- },
+ 'data': sample_data,
'tracking_id': 'bare-5'
}
self._task_post('/api/osd', data)
instance.return_value = fake_client
# Valid DriveGroup
- data = {
- 'method': 'drive_groups',
- 'data': [
- {
- 'service_type': 'osd',
- 'service_id': 'all_hdd',
- 'data_devices': {
- 'rotational': True
- },
- 'host_pattern': '*',
- }
- ],
- 'tracking_id': 'all_hdd, b_ssd'
- }
+ data = self._get_drive_group_data()
# Without orchestrator service
fake_client.available.return_value = False
fake_client.get_missing_features.return_value = []
# Invalid DriveGroup
- data = {
- 'method': 'drive_groups',
- 'data': [
- {
- 'service_type': 'osd',
- 'service_id': 'invalid_dg',
- 'data_devices': {
- 'rotational': True
- },
- 'host_pattern_wrong': 'unknown',
- }
- ],
- 'tracking_id': 'all_hdd, b_ssd'
- }
+ data = self._get_drive_group_data('invalid_dg', 'host_pattern_wrong', 'unknown')
self._task_post('/api/osd', data)
self.assertStatus(400)
[pylint]
# Allow similarity/code duplication detection
jobs = 1
-dirs = . api controllers plugins services tests
+dirs = . api controllers plugins services tests ../../../../qa/tasks/mgr/dashboard
addopts = -rn --rcfile=.pylintrc --jobs={[pylint]jobs}
[rstlint]