import requests
import six
+from teuthology.exceptions import CommandFailedError
from ..mgr_test_case import MgrTestCase
log = logging.getLogger(__name__)
-def authenticate(func):
- def decorate(self, *args, **kwargs):
- self._ceph_cmd(['dashboard', 'set-login-credentials', 'admin', 'admin'])
- self._post('/api/auth', {'username': 'admin', 'password': 'admin'})
- self.assertStatus(201)
- return func(self, *args, **kwargs)
- return decorate
-
-
class DashboardTestCase(MgrTestCase):
MGRS_REQUIRED = 2
MDSS_REQUIRED = 1
_session = None
_resp = None
+ _loggedin = False
+ _base_uri = None
+
+ AUTO_AUTHENTICATE = True
+
+ AUTH_ROLES = ['administrator']
+
+ @classmethod
+ def create_user(cls, username, password, roles):
+ try:
+ cls._ceph_cmd(['dashboard', 'ac-user-show', username])
+ cls._ceph_cmd(['dashboard', 'ac-user-delete', username])
+ except CommandFailedError as ex:
+ if ex.exitstatus != 2:
+ raise ex
+
+ cls._ceph_cmd(['dashboard', 'ac-user-create', username, password])
+
+ set_roles_args = ['dashboard', 'ac-user-set-roles', username]
+ for idx, role in enumerate(roles):
+ if isinstance(role, str):
+ set_roles_args.append(role)
+ else:
+ assert isinstance(role, dict)
+ rolename = 'test_role_{}'.format(idx)
+ try:
+ cls._ceph_cmd(['dashboard', 'ac-role-show', rolename])
+ cls._ceph_cmd(['dashboard', 'ac-role-delete', rolename])
+ except CommandFailedError as ex:
+ if ex.exitstatus != 2:
+ raise ex
+ cls._ceph_cmd(['dashboard', 'ac-role-create', rolename])
+ for mod, perms in role.items():
+ args = ['dashboard', 'ac-role-add-scope-perms', rolename, mod]
+ args.extend(perms)
+ cls._ceph_cmd(args)
+ set_roles_args.append(rolename)
+ cls._ceph_cmd(set_roles_args)
+
+ @classmethod
+ def login(cls, username, password):
+ if cls._loggedin:
+ cls.logout()
+ cls._post('/api/auth', {'username': username, 'password': password})
+ cls._loggedin = True
+
+ @classmethod
+ def logout(cls):
+ if cls._loggedin:
+ cls._delete('/api/auth')
+ cls._loggedin = False
+
+ @classmethod
+ def delete_user(cls, username, roles=None):
+ if roles is None:
+ roles = []
+ cls._ceph_cmd(['dashboard', 'ac-user-delete', username])
+ for idx, role in enumerate(roles):
+ if isinstance(role, dict):
+ cls._ceph_cmd(['dashboard', 'ac-role-delete', 'test_role_{}'.format(idx)])
+
+ @classmethod
+ def RunAs(cls, username, password, roles):
+ def wrapper(func):
+ def execute(self, *args, **kwargs):
+ self.create_user(username, password, roles)
+ self.login(username, password)
+ res = func(self, *args, **kwargs)
+ self.logout()
+ self.delete_user(username, roles)
+ return res
+ return execute
+ return wrapper
@classmethod
def setUpClass(cls):
super(DashboardTestCase, cls).setUpClass()
cls._assign_ports("dashboard", "server_port")
cls._load_module("dashboard")
- cls.base_uri = cls._get_uri("dashboard").rstrip('/')
+ cls._base_uri = cls._get_uri("dashboard").rstrip('/')
if cls.CEPHFS:
cls.mds_cluster.clear_firewall()
cls._session = requests.Session()
cls._resp = None
+ cls.create_user('admin', 'admin', cls.AUTH_ROLES)
+ if cls.AUTO_AUTHENTICATE:
+ cls.login('admin', 'admin')
+
+ def setUp(self):
+ if not self._loggedin and self.AUTO_AUTHENTICATE:
+ self.login('admin', 'admin')
+
@classmethod
def tearDownClass(cls):
super(DashboardTestCase, cls).tearDownClass()
# pylint: disable=inconsistent-return-statements
@classmethod
def _request(cls, url, method, data=None, params=None):
- url = "{}{}".format(cls.base_uri, url)
+ url = "{}{}".format(cls._base_uri, url)
log.info("request %s to %s", method, url)
if method == 'GET':
cls._resp = cls._session.get(url, params=params, verify=False)
@classmethod
def _task_request(cls, method, url, data, timeout):
res = cls._request(url, method, data)
- cls._assertIn(cls._resp.status_code, [200, 201, 202, 204, 400])
+ cls._assertIn(cls._resp.status_code, [200, 201, 202, 204, 400, 403])
+
+ if cls._resp.status_code == 403:
+ return None
if cls._resp.status_code != 202:
log.info("task finished immediately")
class AuthTest(DashboardTestCase):
+
+ AUTO_AUTHENTICATE = False
+
def setUp(self):
self.reset_session()
- self._ceph_cmd(['dashboard', 'set-login-credentials', 'admin', 'admin'])
def test_a_set_login_credentials(self):
- self._ceph_cmd(['dashboard', 'set-login-credentials', 'admin2', 'admin2'])
+ self.create_user('admin2', 'admin2', ['administrator'])
self._post("/api/auth", {'username': 'admin2', 'password': 'admin2'})
self.assertStatus(201)
self.assertJsonBody({"username": "admin2"})
+ self.delete_user('admin2')
def test_login_valid(self):
self._post("/api/auth", {'username': 'admin', 'password': 'admin'})
# -*- coding: utf-8 -*-
from __future__ import absolute_import
-from .helper import DashboardTestCase, authenticate
+from .helper import DashboardTestCase
class CephfsTest(DashboardTestCase):
CEPHFS = True
- @authenticate
+ AUTH_ROLES = ['cephfs-manager']
+
+ @DashboardTestCase.RunAs('test', 'test', ['block-manager'])
+ def test_access_permissions(self):
+ fs_id = self.fs.get_namespace_id()
+ self._get("/api/cephfs/{}/clients".format(fs_id))
+ self.assertStatus(403)
+ self._get("/api/cephfs/{}".format(fs_id))
+ self.assertStatus(403)
+ self._get("/api/cephfs/{}/mds_counters".format(fs_id))
+ self.assertStatus(403)
+
def test_cephfs_clients(self):
fs_id = self.fs.get_namespace_id()
data = self._get("/api/cephfs/{}/clients".format(fs_id))
self.assertIn('status', data)
self.assertIn('data', data)
- @authenticate
def test_cephfs_get(self):
fs_id = self.fs.get_namespace_id()
data = self._get("/api/cephfs/{}/".format(fs_id))
self.assertIsNotNone(data['standbys'])
self.assertIsNotNone(data['versions'])
- @authenticate
def test_cephfs_mds_counters(self):
fs_id = self.fs.get_namespace_id()
data = self._get("/api/cephfs/{}/mds_counters".format(fs_id))
self.assertIsInstance(data, dict)
self.assertIsNotNone(data)
- @authenticate
def test_cephfs_mds_counters_wrong(self):
self._get("/api/cephfs/baadbaad/mds_counters")
self.assertStatus(400)
self.assertJsonBody({
- "component": 'cephfs',
- "code": "invalid_cephfs_id",
- "detail": "Invalid cephfs ID baadbaad"
- })
+ "component": 'cephfs',
+ "code": "invalid_cephfs_id",
+ "detail": "Invalid cephfs ID baadbaad"
+ })
- @authenticate
def test_cephfs_list(self):
data = self._get("/api/cephfs/")
self.assertStatus(200)
import time
-from .helper import DashboardTestCase, authenticate
+from .helper import DashboardTestCase
class ClusterConfigurationTest(DashboardTestCase):
- @authenticate
+
def test_list(self):
data = self._get('/api/cluster_conf')
self.assertStatus(200)
for conf in data:
self._validate_single(conf)
- @authenticate
def test_get(self):
data = self._get('/api/cluster_conf/admin_socket')
self.assertStatus(200)
data = self._get('/api/cluster_conf/fantasy_name')
self.assertStatus(404)
- @authenticate
def test_get_specific_db_config_option(self):
def _get_mon_allow_pool_delete_config():
data = self._get('/api/cluster_conf/mon_allow_pool_delete')
# -*- coding: utf-8 -*-
from __future__ import absolute_import
-from .helper import DashboardTestCase, authenticate
+from .helper import DashboardTestCase
class DashboardTest(DashboardTestCase):
CEPHFS = True
- @authenticate
def test_health(self):
data = self._get("/api/dashboard/health")
self.assertStatus(200)
self.assertIsNotNone(data['mgr_map'])
self.assertIsNotNone(data['df'])
+
+
+ @DashboardTestCase.RunAs('test', 'test', ['pool-manager'])
+ def test_health_permissions(self):
+ data = self._get("/api/dashboard/health")
+ self.assertStatus(200)
+
+ self.assertIn('health', data)
+ self.assertNotIn('mon_status', data)
+ self.assertNotIn('fs_map', data)
+ self.assertNotIn('osd_map', data)
+ self.assertNotIn('clog', data)
+ self.assertNotIn('audit_log', data)
+ self.assertIn('pools', data)
+ self.assertNotIn('mgr_map', data)
+ self.assertIn('df', data)
+ self.assertIsNotNone(data['health'])
+ self.assertIsNotNone(data['pools'])
+
+ cluster_pools = self.ceph_cluster.mon_manager.list_pools()
+ self.assertEqual(len(cluster_pools), len(data['pools']))
+ for pool in data['pools']:
+ self.assertIn(pool['pool_name'], cluster_pools)
+
+ self.assertIsNotNone(data['df'])
import unittest
-from .helper import DashboardTestCase, authenticate
+from .helper import DashboardTestCase
class ECPTest(DashboardTestCase):
+ AUTH_ROLES = ['pool-manager']
+
+ @DashboardTestCase.RunAs('test', 'test', ['block-manager'])
+ def test_read_access_permissions(self):
+ self._get("/api/erasure_code_profile")
+ self.assertStatus(403)
+
+ @DashboardTestCase.RunAs('test', 'test', ['read-only'])
+ def test_write_access_permissions(self):
+ self._get("/api/erasure_code_profile")
+ self.assertStatus(200)
+ data = {'name': 'ecp32', 'k': 3, 'm': 2}
+ self._post('/api/erasure_code_profile', data)
+ self.assertStatus(403)
+ self._delete('/api/erasure_code_profile/default')
+ self.assertStatus(403)
@classmethod
def tearDownClass(cls):
cls._ceph_cmd(['osd', 'erasure-code-profile', 'rm', 'ecp32'])
cls._ceph_cmd(['osd', 'erasure-code-profile', 'rm', 'lrc'])
- @authenticate
def test_list(self):
data = self._get('/api/erasure_code_profile')
self.assertStatus(200)
self.assertEqual(get_data, default[0])
- @authenticate
def test_create(self):
data = {'name': 'ecp32', 'k': 3, 'm': 2}
self._post('/api/erasure_code_profile', data)
self._delete('/api/erasure_code_profile/ecp32')
self.assertStatus(204)
- @authenticate
def test_create_plugin(self):
data = {'name': 'lrc', 'k': '2', 'm': '2', 'l': '2', 'plugin': 'lrc'}
self._post('/api/erasure_code_profile', data)
# -*- coding: utf-8 -*-
from __future__ import absolute_import
-from .helper import DashboardTestCase, authenticate
+from .helper import DashboardTestCase
class HostControllerTest(DashboardTestCase):
- @authenticate
+ AUTH_ROLES = ['read-only']
+
+ @DashboardTestCase.RunAs('test', 'test', ['block-manager'])
+ def test_access_permissions(self):
+ self._get('/api/host')
+ self.assertStatus(403)
+
def test_host_list(self):
data = self._get('/api/host')
self.assertStatus(200)
# -*- coding: utf-8 -*-
from __future__ import absolute_import
-from .helper import DashboardTestCase, authenticate
+from .helper import DashboardTestCase
class MonitorTest(DashboardTestCase):
- @authenticate
+ AUTH_ROLES = ['cluster-manager']
+
+ @DashboardTestCase.RunAs('test', 'test', ['block-manager'])
+ def test_access_permissions(self):
+ self._get('/api/monitor')
+ self.assertStatus(403)
+
+
def test_monitor_default(self):
data = self._get("/api/monitor")
self.assertStatus(200)
import json
-from .helper import DashboardTestCase, authenticate, JObj, JAny, JList, JLeaf, JTuple
+from .helper import DashboardTestCase, JObj, JAny, JList, JLeaf, JTuple
class OsdTest(DashboardTestCase):
+ AUTH_ROLES = ['cluster-manager']
+
+ @DashboardTestCase.RunAs('test', 'test', ['block-manager'])
+ def test_access_permissions(self):
+ self._get('/api/osd')
+ self.assertStatus(403)
+ self._get('/api/osd/0')
+ self.assertStatus(403)
+
def assert_in_and_not_none(self, data, properties):
self.assertSchema(data, JObj({p: JAny(none=False) for p in properties}, allow_unknown=True))
- @authenticate
def test_list(self):
data = self._get('/api/osd')
self.assertStatus(200)
self.assertSchema(data['stats_history']['op_out_bytes'],
JList(JTuple([JLeaf(int), JLeaf(float)])))
- @authenticate
def test_details(self):
data = self._get('/api/osd/0')
self.assertStatus(200)
self.assert_in_and_not_none(data['histogram']['osd'], ['op_w_latency_in_bytes_histogram',
'op_r_latency_out_bytes_histogram'])
- @authenticate
def test_scrub(self):
self._post('/api/osd/0/scrub?deep=False')
self.assertStatus(200)
cls._put('/api/osd/flags', data={'flags': flags})
return sorted(cls._resp.json())
- @authenticate
def test_list_osd_flags(self):
flags = self._get('/api/osd/flags')
self.assertStatus(200)
self.assertEqual(len(flags), 3)
self.assertEqual(sorted(flags), self._initial_flags)
- @authenticate
def test_add_osd_flag(self):
flags = self._put_flags([
'sortbitwise', 'recovery_deletes', 'purged_snapdirs', 'noout',
# -*- coding: utf-8 -*-
from __future__ import absolute_import
-from .helper import DashboardTestCase, authenticate
+from .helper import DashboardTestCase
class PerfCountersControllerTest(DashboardTestCase):
- @authenticate
def test_perf_counters_list(self):
data = self._get('/api/perf_counters')
self.assertStatus(200)
self.assertIn('value', counter)
- @authenticate
def test_perf_counters_mon_get(self):
mon = self.mons()[0]
data = self._get('/api/perf_counters/mon/{}'.format(mon))
self.assertStatus(200)
self._validate_perf(mon, 'mon', data, allow_empty=False)
- @authenticate
def test_perf_counters_mgr_get(self):
mgr = self.mgr_cluster.mgr_ids[0]
data = self._get('/api/perf_counters/mgr/{}'.format(mgr))
self.assertStatus(200)
self._validate_perf(mgr, 'mgr', data, allow_empty=False)
- @authenticate
def test_perf_counters_mds_get(self):
for mds in self.mds_cluster.mds_ids:
data = self._get('/api/perf_counters/mds/{}'.format(mds))
self.assertStatus(200)
self._validate_perf(mds, 'mds', data, allow_empty=True)
- @authenticate
def test_perf_counters_osd_get(self):
for osd in self.ceph_cluster.mon_manager.get_osd_dump():
osd = osd['osd']
import six
-from .helper import DashboardTestCase, authenticate
+from .helper import DashboardTestCase
log = logging.getLogger(__name__)
class PoolTest(DashboardTestCase):
+ AUTH_ROLES = ['pool-manager']
+
@classmethod
def tearDownClass(cls):
super(PoolTest, cls).tearDownClass()
cls._ceph_cmd(['osd', 'pool', 'delete', name, name, '--yes-i-really-really-mean-it'])
cls._ceph_cmd(['osd', 'erasure-code-profile', 'rm', 'ecprofile'])
- @authenticate
+ @DashboardTestCase.RunAs('test', 'test', [{'pool': ['create', 'update', 'delete']}])
+ def test_read_access_permissions(self):
+ self._get('/api/pool')
+ self.assertStatus(403)
+ self._get('/api/pool/bla')
+ self.assertStatus(403)
+
+ @DashboardTestCase.RunAs('test', 'test', [{'pool': ['read', 'update', 'delete']}])
+ def test_create_access_permissions(self):
+ self._post('/api/pool/', {})
+ self.assertStatus(403)
+
+ @DashboardTestCase.RunAs('test', 'test', [{'pool': ['read', 'create', 'update']}])
+ def test_delete_access_permissions(self):
+ self._delete('/api/pool/ddd')
+ self.assertStatus(403)
+
def test_pool_list(self):
data = self._get("/api/pool")
self.assertStatus(200)
self.assertNotIn('stats', pool)
self.assertIn(pool['pool_name'], cluster_pools)
- @authenticate
def test_pool_list_attrs(self):
data = self._get("/api/pool?attrs=type,flags")
self.assertStatus(200)
self.assertNotIn('stats', pool)
self.assertIn(pool['pool_name'], cluster_pools)
- @authenticate
def test_pool_list_stats(self):
data = self._get("/api/pool?stats=true")
self.assertStatus(200)
self.assertIn('flags_names', pool)
self.assertIn(pool['pool_name'], cluster_pools)
- @authenticate
def test_pool_get(self):
cluster_pools = self.ceph_cluster.mon_manager.list_pools()
pool = self._get("/api/pool/{}?stats=true&attrs=type,flags,stats"
self.assertIn('stats', pool)
self.assertNotIn('flags_names', pool)
- @authenticate
def _pool_create(self, data):
try:
self._post('/api/pool/', data)
for data in pools:
self._pool_create(data)
- @authenticate
def test_pool_create_fail(self):
data = {'pool_type': u'replicated', 'rule_name': u'dnf', 'pg_num': u'8', 'pool': u'sadfs'}
self._post('/api/pool/', data)
'detail': "[errno -2] specified rule dnf doesn't exist"
})
- @authenticate
def test_pool_info(self):
info_data = self._get("/api/pool/_info")
self.assertEqual(set(info_data),
all(isinstance(n, six.string_types) for n in info_data['compression_algorithms']))
self.assertTrue(
all(isinstance(n, six.string_types) for n in info_data['compression_modes']))
-
-
class RbdTest(DashboardTestCase):
-
- @classmethod
- def authenticate(cls):
- cls._ceph_cmd(['dashboard', 'set-login-credentials', 'admin', 'admin'])
- cls._post('/api/auth', {'username': 'admin', 'password': 'admin'})
+ AUTH_ROLES = ['pool-manager', 'block-manager']
@classmethod
def create_pool(cls, name, pg_num, pool_type, application='rbd'):
data['flags'] = ['ec_overwrites']
cls._post("/api/pool", data)
+ @DashboardTestCase.RunAs('test', 'test', [{'rbd-image': ['create', 'update', 'delete']}])
+ def test_read_access_permissions(self):
+ self._get('/api/block/image')
+ self.assertStatus(403)
+ self._get('/api/block/image/pool/image')
+ self.assertStatus(403)
+
+ @DashboardTestCase.RunAs('test', 'test', [{'rbd-image': ['read', 'update', 'delete']}])
+ def test_create_access_permissions(self):
+ self.create_image('pool', 'name', 0)
+ self.assertStatus(403)
+ self.create_snapshot('pool', 'image', 'snapshot')
+ self.assertStatus(403)
+ self.copy_image('src_pool', 'src_image', 'dest_pool', 'dest_image')
+ self.assertStatus(403)
+ self.clone_image('parent_pool', 'parent_image', 'parent_snap', 'pool', 'name')
+ self.assertStatus(403)
+
+ @DashboardTestCase.RunAs('test', 'test', [{'rbd-image': ['read', 'create', 'delete']}])
+ def test_update_access_permissions(self):
+ self.edit_image('pool', 'image')
+ self.assertStatus(403)
+ self.update_snapshot('pool', 'image', 'snapshot', None, None)
+ self.assertStatus(403)
+ self._task_post('/api/block/image/rbd/rollback_img/snap/snap1/rollback')
+ self.assertStatus(403)
+ self.flatten_image('pool', 'image')
+ self.assertStatus(403)
+
+ @DashboardTestCase.RunAs('test', 'test', [{'rbd-image': ['read', 'create', 'update']}])
+ def test_delete_access_permissions(self):
+ self.remove_image('pool', 'image')
+ self.assertStatus(403)
+ self.remove_snapshot('pool', 'image', 'snapshot')
+ self.assertStatus(403)
+
@classmethod
def create_image(cls, pool, name, size, **kwargs):
data = {'name': name, 'pool_name': pool, 'size': size}
@classmethod
def setUpClass(cls):
super(RbdTest, cls).setUpClass()
- cls.authenticate()
cls.create_pool('rbd', 10, 'replicated')
cls.create_pool('rbd_iscsi', 10, 'replicated')
# -*- coding: utf-8 -*-
from __future__ import absolute_import
-import urllib
import logging
-logger = logging.getLogger(__name__)
-from .helper import DashboardTestCase, authenticate
+from .helper import DashboardTestCase
+
+
+logger = logging.getLogger(__name__)
class RgwTestCase(DashboardTestCase):
maxDiff = None
create_test_user = False
+ AUTH_ROLES = ['rgw-manager']
+
@classmethod
def setUpClass(cls):
super(RgwTestCase, cls).setUpClass()
def tearDownClass(cls):
if cls.create_test_user:
cls._radosgw_admin_cmd(['user', 'rm', '--uid=teuth-test-user'])
- super(DashboardTestCase, cls).tearDownClass()
+ super(RgwTestCase, cls).tearDownClass()
+
+ def setUp(self):
+ super(RgwTestCase, self).setUp()
def get_rgw_user(self, uid):
return self._get('/api/rgw/user/{}'.format(uid))
class RgwApiCredentialsTest(RgwTestCase):
+ AUTH_ROLES = ['rgw-manager']
+
def setUp(self):
# Restart the Dashboard module to ensure that the connection to the
# RGW Admin Ops API is re-established with the new credentials.
+ self.logout()
self._ceph_cmd(['mgr', 'module', 'disable', 'dashboard'])
self._ceph_cmd(['mgr', 'module', 'enable', 'dashboard', '--force'])
# Set the default credentials.
self._ceph_cmd(['dashboard', 'set-rgw-api-user-id', ''])
self._ceph_cmd(['dashboard', 'set-rgw-api-secret-key', 'admin'])
self._ceph_cmd(['dashboard', 'set-rgw-api-access-key', 'admin'])
+ super(RgwApiCredentialsTest, self).setUp()
- @authenticate
def test_no_access_secret_key(self):
self._ceph_cmd(['dashboard', 'set-rgw-api-secret-key', ''])
self._ceph_cmd(['dashboard', 'set-rgw-api-access-key', ''])
self.assertIn('detail', resp)
self.assertIn('component', resp)
self.assertIn('No RGW credentials found', resp['detail'])
- self.assertEquals(resp['component'], 'rgw')
+ self.assertEqual(resp['component'], 'rgw')
- @authenticate
def test_success(self):
data = self._get('/api/rgw/status')
self.assertStatus(200)
self.assertIn('message', data)
self.assertTrue(data['available'])
- @authenticate
def test_invalid_user_id(self):
self._ceph_cmd(['dashboard', 'set-rgw-api-user-id', 'xyz'])
data = self._get('/api/rgw/status')
class RgwBucketTest(RgwTestCase):
+ AUTH_ROLES = ['rgw-manager']
+
@classmethod
def setUpClass(cls):
cls.create_test_user = True
super(RgwBucketTest, cls).setUpClass()
- @authenticate
def test_all(self):
# Create a new bucket.
self._post(
self.assertIn('creation_time', data)
self.assertIn('name', data['bucket'])
self.assertIn('bucket_id', data['bucket'])
- self.assertEquals(data['bucket']['name'], 'teuth-test-bucket')
+ self.assertEqual(data['bucket']['name'], 'teuth-test-bucket')
# List all buckets.
data = self._get('/api/rgw/bucket')
self.assertIn('bucket', data)
self.assertIn('bucket_quota', data)
self.assertIn('owner', data)
- self.assertEquals(data['bucket'], 'teuth-test-bucket')
- self.assertEquals(data['owner'], 'admin')
+ self.assertEqual(data['bucket'], 'teuth-test-bucket')
+ self.assertEqual(data['owner'], 'admin')
# Update the bucket.
self._put(
self.assertStatus(200)
data = self._get('/api/rgw/bucket/teuth-test-bucket')
self.assertStatus(200)
- self.assertEquals(data['owner'], 'teuth-test-user')
+ self.assertEqual(data['owner'], 'teuth-test-user')
# Delete the bucket.
self._delete('/api/rgw/bucket/teuth-test-bucket')
class RgwDaemonTest(DashboardTestCase):
- @authenticate
+ AUTH_ROLES = ['rgw-manager']
+
+
+ @DashboardTestCase.RunAs('test', 'test', [{'rgw': ['create', 'update', 'delete']}])
+ def test_read_access_permissions(self):
+ self._get('/api/rgw/daemon')
+ self.assertStatus(403)
+ self._get('/api/rgw/daemon/id')
+ self.assertStatus(403)
+
def test_list(self):
data = self._get('/api/rgw/daemon')
self.assertStatus(200)
self.assertIn('version', data)
self.assertIn('server_hostname', data)
- @authenticate
def test_get(self):
data = self._get('/api/rgw/daemon')
self.assertStatus(200)
self.assertIn('rgw_status', data)
self.assertTrue(data['rgw_metadata'])
- @authenticate
def test_status(self):
self._radosgw_admin_cmd([
'user', 'create', '--uid=admin', '--display-name=admin',
class RgwUserTest(RgwTestCase):
+ AUTH_ROLES = ['rgw-manager']
+
@classmethod
def setUpClass(cls):
super(RgwUserTest, cls).setUpClass()
self.assertIn('tenant', data)
self.assertIn('user_id', data)
- @authenticate
def test_get(self):
data = self.get_rgw_user('admin')
self.assertStatus(200)
self._assert_user_data(data)
- self.assertEquals(data['user_id'], 'admin')
+ self.assertEqual(data['user_id'], 'admin')
- @authenticate
def test_list(self):
data = self._get('/api/rgw/user')
self.assertStatus(200)
self.assertGreaterEqual(len(data), 1)
self.assertIn('admin', data)
- @authenticate
def test_create_update_delete(self):
# Create a new user.
self._post('/api/rgw/user', params={
self.assertStatus(201)
data = self.jsonBody()
self._assert_user_data(data)
- self.assertEquals(data['user_id'], 'teuth-test-user')
- self.assertEquals(data['display_name'], 'display name')
+ self.assertEqual(data['user_id'], 'teuth-test-user')
+ self.assertEqual(data['display_name'], 'display name')
# Get the user.
data = self.get_rgw_user('teuth-test-user')
self.assertStatus(200)
self._assert_user_data(data)
- self.assertEquals(data['user_id'], 'teuth-test-user')
+ self.assertEqual(data['user_id'], 'teuth-test-user')
# Update the user.
self._put(
class RgwUserCapabilityTest(RgwTestCase):
+ AUTH_ROLES = ['rgw-manager']
+
@classmethod
def setUpClass(cls):
cls.create_test_user = True
super(RgwUserCapabilityTest, cls).setUpClass()
- @authenticate
def test_set(self):
self._post(
'/api/rgw/user/teuth-test-user/capability',
self.assertEqual(data['caps'][0]['type'], 'usage')
self.assertEqual(data['caps'][0]['perm'], 'read')
- @authenticate
def test_delete(self):
self._delete(
'/api/rgw/user/teuth-test-user/capability',
class RgwUserKeyTest(RgwTestCase):
+ AUTH_ROLES = ['rgw-manager']
+
@classmethod
def setUpClass(cls):
cls.create_test_user = True
super(RgwUserKeyTest, cls).setUpClass()
- @authenticate
def test_create_s3(self):
self._post(
'/api/rgw/user/teuth-test-user/key',
self.assertIsInstance(key, object)
self.assertEqual(key['secret_key'], 'aaabbbccc')
- @authenticate
def test_create_swift(self):
self._post(
'/api/rgw/user/teuth-test-user/key',
key = self.find_in_list('secret_key', 'xxxyyyzzz', data)
self.assertIsInstance(key, object)
- @authenticate
def test_delete_s3(self):
self._delete(
'/api/rgw/user/teuth-test-user/key',
})
self.assertStatus(204)
- @authenticate
def test_delete_swift(self):
self._delete(
'/api/rgw/user/teuth-test-user/key',
class RgwUserQuotaTest(RgwTestCase):
+ AUTH_ROLES = ['rgw-manager']
+
@classmethod
def setUpClass(cls):
cls.create_test_user = True
self.assertIn('max_size_kb', data['bucket_quota'])
self.assertIn('max_size', data['bucket_quota'])
- @authenticate
def test_get_quota(self):
data = self._get('/api/rgw/user/teuth-test-user/quota')
self.assertStatus(200)
self._assert_quota(data)
- @authenticate
def test_set_user_quota(self):
self._put(
'/api/rgw/user/teuth-test-user/quota',
self.assertTrue(data['user_quota']['enabled'])
self.assertEqual(data['user_quota']['max_size_kb'], 2048)
- @authenticate
def test_set_bucket_quota(self):
self._put(
'/api/rgw/user/teuth-test-user/quota',
class RgwUserSubuserTest(RgwTestCase):
+ AUTH_ROLES = ['rgw-manager']
+
@classmethod
def setUpClass(cls):
cls.create_test_user = True
super(RgwUserSubuserTest, cls).setUpClass()
- @authenticate
def test_create_swift(self):
self._post(
'/api/rgw/user/teuth-test-user/subuser',
key = self.find_in_list('user', 'teuth-test-user:tux', data['swift_keys'])
self.assertIsInstance(key, object)
- @authenticate
def test_create_s3(self):
self._post(
'/api/rgw/user/teuth-test-user/subuser',
self.assertIsInstance(key, object)
self.assertEqual(key['secret_key'], 'xxx')
- @authenticate
def test_delete_w_purge(self):
self._delete(
'/api/rgw/user/teuth-test-user/subuser/teuth-test-subuser2')
data['swift_keys'])
self.assertIsNone(key)
- @authenticate
def test_delete_wo_purge(self):
self._delete(
'/api/rgw/user/teuth-test-user/subuser/teuth-test-subuser',
from __future__ import absolute_import
-from .helper import DashboardTestCase, authenticate
+from .helper import DashboardTestCase
class SummaryTest(DashboardTestCase):
CEPHFS = True
- @authenticate
def test_summary(self):
data = self._get("/api/summary")
self.assertStatus(200)
self.assertIsNotNone(data['mgr_id'])
self.assertIsNotNone(data['have_mon_connection'])
self.assertEqual(data['rbd_mirroring'], {'errors': 0, 'warnings': 0})
+
+ @DashboardTestCase.RunAs('test', 'test', ['pool-manager'])
+ def test_summary_permissions(self):
+ data = self._get("/api/summary")
+ self.assertStatus(200)
+
+ self.assertIn('health_status', data)
+ self.assertIn('mgr_id', data)
+ self.assertIn('have_mon_connection', data)
+ self.assertNotIn('rbd_mirroring', data)
+ self.assertIn('executing_tasks', data)
+ self.assertIn('finished_tasks', data)
+ self.assertIn('version', data)
+ self.assertIsNotNone(data['health_status'])
+ self.assertIsNotNone(data['mgr_id'])
+ self.assertIsNotNone(data['have_mon_connection'])
+