]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/dashboard: Add REST API for role management 23322/head
authorRicardo Marques <rimarques@suse.com>
Mon, 30 Jul 2018 13:26:00 +0000 (14:26 +0100)
committerRicardo Marques <rimarques@suse.com>
Mon, 3 Sep 2018 12:15:42 +0000 (13:15 +0100)
Fixes: https://tracker.ceph.com/issues/25138
Signed-off-by: Ricardo Marques <rimarques@suse.com>
qa/tasks/mgr/dashboard/test_role.py
src/pybind/mgr/dashboard/controllers/role.py
src/pybind/mgr/dashboard/services/access_control.py

index 75fcdbc5e8e1207886f6073e992bff59cd1f9878..120279def3a28201d55c7998a3fd8a949275e600 100644 (file)
@@ -6,6 +6,52 @@ from .helper import DashboardTestCase
 
 
 class RoleTest(DashboardTestCase):
+    @classmethod
+    def _create_role(cls, name=None, description=None, scopes_permissions=None):
+        data = {}
+        if name:
+            data['name'] = name
+        if description:
+            data['description'] = description
+        if scopes_permissions:
+            data['scopes_permissions'] = scopes_permissions
+        cls._post('/api/role', data)
+
+    def test_crud_role(self):
+        self._create_role(name='role1',
+                          description='Description 1',
+                          scopes_permissions={'osd': ['read']})
+        self.assertStatus(201)
+        self.assertJsonBody({
+            'name': 'role1',
+            'description': 'Description 1',
+            'scopes_permissions': {'osd': ['read']},
+            'system': False
+        })
+
+        self._get('/api/role/role1')
+        self.assertStatus(200)
+        self.assertJsonBody({
+            'name': 'role1',
+            'description': 'Description 1',
+            'scopes_permissions': {'osd': ['read']},
+            'system': False
+        })
+
+        self._put('/api/role/role1', {
+            'description': 'Description 2',
+            'scopes_permissions': {'osd': ['read', 'update']},
+        })
+        self.assertStatus(200)
+        self.assertJsonBody({
+            'name': 'role1',
+            'description': 'Description 2',
+            'scopes_permissions': {'osd': ['read', 'update']},
+            'system': False
+        })
+
+        self._delete('/api/role/role1')
+        self.assertStatus(204)
 
     def test_list_roles(self):
         roles = self._get('/api/role')
@@ -14,4 +60,79 @@ class RoleTest(DashboardTestCase):
         self.assertGreaterEqual(len(roles), 1)
         for role in roles:
             self.assertIn('name', role)
+            self.assertIn('description', role)
             self.assertIn('scopes_permissions', role)
+            self.assertIn('system', role)
+
+    def test_get_role_does_not_exist(self):
+        self._get('/api/role/role2')
+        self.assertStatus(404)
+
+    def test_create_role_already_exists(self):
+        self._create_role(name='read-only',
+                          description='Description 1',
+                          scopes_permissions={'osd': ['read']})
+        self.assertStatus(400)
+        self.assertError(code='role_already_exists',
+                         component='role')
+
+    def test_create_role_no_name(self):
+        self._create_role(description='Description 1',
+                          scopes_permissions={'osd': ['read']})
+        self.assertStatus(400)
+        self.assertError(code='name_required',
+                         component='role')
+
+    def test_create_role_invalid_scope(self):
+        self._create_role(name='role1',
+                          description='Description 1',
+                          scopes_permissions={'invalid-scope': ['read']})
+        self.assertStatus(400)
+        self.assertError(code='invalid_scope',
+                         component='role')
+
+    def test_create_role_invalid_permission(self):
+        self._create_role(name='role1',
+                          description='Description 1',
+                          scopes_permissions={'osd': ['invalid-permission']})
+        self.assertStatus(400)
+        self.assertError(code='invalid_permission',
+                         component='role')
+
+    def test_delete_role_does_not_exist(self):
+        self._delete('/api/role/role2')
+        self.assertStatus(404)
+
+    def test_delete_system_role(self):
+        self._delete('/api/role/read-only')
+        self.assertStatus(400)
+        self.assertError(code='cannot_delete_system_role',
+                         component='role')
+
+    def test_delete_role_associated_with_user(self):
+        self._create_role(name='role1',
+                          description='Description 1',
+                          scopes_permissions={'user': ['create', 'read', 'update', 'delete']})
+        self.assertStatus(201)
+        self._put('/api/user/admin', {'roles': ['role1']})
+        self.assertStatus(200)
+
+        self._delete('/api/role/role1')
+        self.assertStatus(400)
+        self.assertError(code='role_is_associated_with_user',
+                         component='role')
+
+        self._put('/api/user/admin', {'roles': ['administrator']})
+        self.assertStatus(200)
+        self._delete('/api/role/role1')
+        self.assertStatus(204)
+
+    def test_update_role_does_not_exist(self):
+        self._put('/api/role/role2', {})
+        self.assertStatus(404)
+
+    def test_update_system_role(self):
+        self._put('/api/role/read-only', {})
+        self.assertStatus(400)
+        self.assertError(code='cannot_update_system_role',
+                         component='role')
index 1272b5f8aaf0e9c7557421521f924e5b6e423e85..3271442305185f06da5ce7476558d5f588287436 100644 (file)
 # -*- coding: utf-8 -*-
 from __future__ import absolute_import
 
-from . import ApiController, RESTController
-from ..security import Scope
+import cherrypy
+
+from . import ApiController, RESTController, UiApiController
+from ..exceptions import RoleDoesNotExist, DashboardException,\
+    RoleIsAssociatedWithUser, RoleAlreadyExists
+from ..security import Scope as SecurityScope, Permission
 from ..services.access_control import ACCESS_CTRL_DB, SYSTEM_ROLES
 
 
-@ApiController('/role', Scope.USER)
+@ApiController('/role', SecurityScope.USER)
 class Role(RESTController):
+    @staticmethod
+    def _role_to_dict(role):
+        role_dict = role.to_dict()
+        role_dict['system'] = role_dict['name'] in SYSTEM_ROLES
+        return role_dict
+
+    @staticmethod
+    def _validate_permissions(scopes_permissions):
+        if scopes_permissions:
+            for scope, permissions in scopes_permissions.items():
+                if scope not in SecurityScope.all_scopes():
+                    raise DashboardException(msg='Invalid scope',
+                                             code='invalid_scope',
+                                             component='role')
+                if any(permission not in Permission.all_permissions()
+                       for permission in permissions):
+                    raise DashboardException(msg='Invalid permission',
+                                             code='invalid_permission',
+                                             component='role')
+
+    @staticmethod
+    def _set_permissions(role, scopes_permissions):
+        role.reset_scope_permissions()
+        if scopes_permissions:
+            for scope, permissions in scopes_permissions.items():
+                if permissions:
+                    role.set_scope_permissions(scope, permissions)
+
+    def list(self):
+        roles = dict(ACCESS_CTRL_DB.roles)
+        roles.update(SYSTEM_ROLES)
+        roles = sorted(roles.values(), key=lambda role: role.name)
+        return [Role._role_to_dict(r) for r in roles]
+
+    def get(self, name):
+        role = SYSTEM_ROLES.get(name)
+        if not role:
+            try:
+                role = ACCESS_CTRL_DB.get_role(name)
+            except RoleDoesNotExist:
+                raise cherrypy.HTTPError(404)
+        return Role._role_to_dict(role)
+
+    def create(self, name=None, description=None, scopes_permissions=None):
+        if not name:
+            raise DashboardException(msg='Name is required',
+                                     code='name_required',
+                                     component='role')
+        Role._validate_permissions(scopes_permissions)
+        try:
+            role = ACCESS_CTRL_DB.create_role(name, description)
+        except RoleAlreadyExists:
+            raise DashboardException(msg='Role already exists',
+                                     code='role_already_exists',
+                                     component='role')
+        Role._set_permissions(role, scopes_permissions)
+        ACCESS_CTRL_DB.save()
+        return Role._role_to_dict(role)
+
+    def set(self, name, description=None, scopes_permissions=None):
+        try:
+            role = ACCESS_CTRL_DB.get_role(name)
+        except RoleDoesNotExist:
+            if name in SYSTEM_ROLES:
+                raise DashboardException(msg='Cannot update system role',
+                                         code='cannot_update_system_role',
+                                         component='role')
+            raise cherrypy.HTTPError(404)
+        Role._validate_permissions(scopes_permissions)
+        Role._set_permissions(role, scopes_permissions)
+        role.description = description
+        ACCESS_CTRL_DB.save()
+        return Role._role_to_dict(role)
+
+    def delete(self, name):
+        try:
+            ACCESS_CTRL_DB.delete_role(name)
+        except RoleDoesNotExist:
+            if name in SYSTEM_ROLES:
+                raise DashboardException(msg='Cannot delete system role',
+                                         code='cannot_delete_system_role',
+                                         component='role')
+            raise cherrypy.HTTPError(404)
+        except RoleIsAssociatedWithUser:
+            raise DashboardException(msg='Role is associated with user',
+                                     code='role_is_associated_with_user',
+                                     component='role')
+        ACCESS_CTRL_DB.save()
+
+
+@UiApiController('/scope', SecurityScope.USER)
+class Scope(RESTController):
     def list(self):
-        all_roles = dict(ACCESS_CTRL_DB.roles)
-        all_roles.update(SYSTEM_ROLES)
-        items = sorted(all_roles.items(), key=lambda role: role[1].name)
-        return [r.to_dict() for _, r in items]
+        return SecurityScope.all_scopes()
index f980d6e932a3879865dd8cbf7d520ba9acd553cf..ed669c2c68f66407e170331e4278d99005fbd64d 100644 (file)
@@ -59,6 +59,9 @@ class Role(object):
             raise ScopeNotInRole(scope, self.name)
         del self.scopes_permissions[scope]
 
+    def reset_scope_permissions(self):
+        self.scopes_permissions = {}
+
     def authorize(self, scope, permissions):
         if scope in self.scopes_permissions:
             role_perms = self.scopes_permissions[scope]