class RoleTest(DashboardTestCase):
+ @classmethod
+ def _create_role(cls, name=None, description=None, scopes_permissions=None):
+ data = {}
+ if name:
+ data['name'] = name
+ if description:
+ data['description'] = description
+ if scopes_permissions:
+ data['scopes_permissions'] = scopes_permissions
+ cls._post('/api/role', data)
+
+ def test_crud_role(self):
+ self._create_role(name='role1',
+ description='Description 1',
+ scopes_permissions={'osd': ['read']})
+ self.assertStatus(201)
+ self.assertJsonBody({
+ 'name': 'role1',
+ 'description': 'Description 1',
+ 'scopes_permissions': {'osd': ['read']},
+ 'system': False
+ })
+
+ self._get('/api/role/role1')
+ self.assertStatus(200)
+ self.assertJsonBody({
+ 'name': 'role1',
+ 'description': 'Description 1',
+ 'scopes_permissions': {'osd': ['read']},
+ 'system': False
+ })
+
+ self._put('/api/role/role1', {
+ 'description': 'Description 2',
+ 'scopes_permissions': {'osd': ['read', 'update']},
+ })
+ self.assertStatus(200)
+ self.assertJsonBody({
+ 'name': 'role1',
+ 'description': 'Description 2',
+ 'scopes_permissions': {'osd': ['read', 'update']},
+ 'system': False
+ })
+
+ self._delete('/api/role/role1')
+ self.assertStatus(204)
def test_list_roles(self):
roles = self._get('/api/role')
self.assertGreaterEqual(len(roles), 1)
for role in roles:
self.assertIn('name', role)
+ self.assertIn('description', role)
self.assertIn('scopes_permissions', role)
+ self.assertIn('system', role)
+
+ def test_get_role_does_not_exist(self):
+ self._get('/api/role/role2')
+ self.assertStatus(404)
+
+ def test_create_role_already_exists(self):
+ self._create_role(name='read-only',
+ description='Description 1',
+ scopes_permissions={'osd': ['read']})
+ self.assertStatus(400)
+ self.assertError(code='role_already_exists',
+ component='role')
+
+ def test_create_role_no_name(self):
+ self._create_role(description='Description 1',
+ scopes_permissions={'osd': ['read']})
+ self.assertStatus(400)
+ self.assertError(code='name_required',
+ component='role')
+
+ def test_create_role_invalid_scope(self):
+ self._create_role(name='role1',
+ description='Description 1',
+ scopes_permissions={'invalid-scope': ['read']})
+ self.assertStatus(400)
+ self.assertError(code='invalid_scope',
+ component='role')
+
+ def test_create_role_invalid_permission(self):
+ self._create_role(name='role1',
+ description='Description 1',
+ scopes_permissions={'osd': ['invalid-permission']})
+ self.assertStatus(400)
+ self.assertError(code='invalid_permission',
+ component='role')
+
+ def test_delete_role_does_not_exist(self):
+ self._delete('/api/role/role2')
+ self.assertStatus(404)
+
+ def test_delete_system_role(self):
+ self._delete('/api/role/read-only')
+ self.assertStatus(400)
+ self.assertError(code='cannot_delete_system_role',
+ component='role')
+
+ def test_delete_role_associated_with_user(self):
+ self._create_role(name='role1',
+ description='Description 1',
+ scopes_permissions={'user': ['create', 'read', 'update', 'delete']})
+ self.assertStatus(201)
+ self._put('/api/user/admin', {'roles': ['role1']})
+ self.assertStatus(200)
+
+ self._delete('/api/role/role1')
+ self.assertStatus(400)
+ self.assertError(code='role_is_associated_with_user',
+ component='role')
+
+ self._put('/api/user/admin', {'roles': ['administrator']})
+ self.assertStatus(200)
+ self._delete('/api/role/role1')
+ self.assertStatus(204)
+
+ def test_update_role_does_not_exist(self):
+ self._put('/api/role/role2', {})
+ self.assertStatus(404)
+
+ def test_update_system_role(self):
+ self._put('/api/role/read-only', {})
+ self.assertStatus(400)
+ self.assertError(code='cannot_update_system_role',
+ component='role')
# -*- coding: utf-8 -*-
from __future__ import absolute_import
-from . import ApiController, RESTController
-from ..security import Scope
+import cherrypy
+
+from . import ApiController, RESTController, UiApiController
+from ..exceptions import RoleDoesNotExist, DashboardException,\
+ RoleIsAssociatedWithUser, RoleAlreadyExists
+from ..security import Scope as SecurityScope, Permission
from ..services.access_control import ACCESS_CTRL_DB, SYSTEM_ROLES
-@ApiController('/role', Scope.USER)
+@ApiController('/role', SecurityScope.USER)
class Role(RESTController):
+ @staticmethod
+ def _role_to_dict(role):
+ role_dict = role.to_dict()
+ role_dict['system'] = role_dict['name'] in SYSTEM_ROLES
+ return role_dict
+
+ @staticmethod
+ def _validate_permissions(scopes_permissions):
+ if scopes_permissions:
+ for scope, permissions in scopes_permissions.items():
+ if scope not in SecurityScope.all_scopes():
+ raise DashboardException(msg='Invalid scope',
+ code='invalid_scope',
+ component='role')
+ if any(permission not in Permission.all_permissions()
+ for permission in permissions):
+ raise DashboardException(msg='Invalid permission',
+ code='invalid_permission',
+ component='role')
+
+ @staticmethod
+ def _set_permissions(role, scopes_permissions):
+ role.reset_scope_permissions()
+ if scopes_permissions:
+ for scope, permissions in scopes_permissions.items():
+ if permissions:
+ role.set_scope_permissions(scope, permissions)
+
+ def list(self):
+ roles = dict(ACCESS_CTRL_DB.roles)
+ roles.update(SYSTEM_ROLES)
+ roles = sorted(roles.values(), key=lambda role: role.name)
+ return [Role._role_to_dict(r) for r in roles]
+
+ def get(self, name):
+ role = SYSTEM_ROLES.get(name)
+ if not role:
+ try:
+ role = ACCESS_CTRL_DB.get_role(name)
+ except RoleDoesNotExist:
+ raise cherrypy.HTTPError(404)
+ return Role._role_to_dict(role)
+
+ def create(self, name=None, description=None, scopes_permissions=None):
+ if not name:
+ raise DashboardException(msg='Name is required',
+ code='name_required',
+ component='role')
+ Role._validate_permissions(scopes_permissions)
+ try:
+ role = ACCESS_CTRL_DB.create_role(name, description)
+ except RoleAlreadyExists:
+ raise DashboardException(msg='Role already exists',
+ code='role_already_exists',
+ component='role')
+ Role._set_permissions(role, scopes_permissions)
+ ACCESS_CTRL_DB.save()
+ return Role._role_to_dict(role)
+
+ def set(self, name, description=None, scopes_permissions=None):
+ try:
+ role = ACCESS_CTRL_DB.get_role(name)
+ except RoleDoesNotExist:
+ if name in SYSTEM_ROLES:
+ raise DashboardException(msg='Cannot update system role',
+ code='cannot_update_system_role',
+ component='role')
+ raise cherrypy.HTTPError(404)
+ Role._validate_permissions(scopes_permissions)
+ Role._set_permissions(role, scopes_permissions)
+ role.description = description
+ ACCESS_CTRL_DB.save()
+ return Role._role_to_dict(role)
+
+ def delete(self, name):
+ try:
+ ACCESS_CTRL_DB.delete_role(name)
+ except RoleDoesNotExist:
+ if name in SYSTEM_ROLES:
+ raise DashboardException(msg='Cannot delete system role',
+ code='cannot_delete_system_role',
+ component='role')
+ raise cherrypy.HTTPError(404)
+ except RoleIsAssociatedWithUser:
+ raise DashboardException(msg='Role is associated with user',
+ code='role_is_associated_with_user',
+ component='role')
+ ACCESS_CTRL_DB.save()
+
+
+@UiApiController('/scope', SecurityScope.USER)
+class Scope(RESTController):
def list(self):
- all_roles = dict(ACCESS_CTRL_DB.roles)
- all_roles.update(SYSTEM_ROLES)
- items = sorted(all_roles.items(), key=lambda role: role[1].name)
- return [r.to_dict() for _, r in items]
+ return SecurityScope.all_scopes()
raise ScopeNotInRole(scope, self.name)
del self.scopes_permissions[scope]
+ def reset_scope_permissions(self):
+ self.scopes_permissions = {}
+
def authorize(self, scope, permissions):
if scope in self.scopes_permissions:
role_perms = self.scopes_permissions[scope]