--- /dev/null
+# -*- coding: utf-8 -*-
+
+from __future__ import absolute_import
+
+from .helper import DashboardTestCase
+
+
+class RoleTest(DashboardTestCase):
+
+ def test_list_roles(self):
+ roles = self._get('/api/role')
+ self.assertStatus(200)
+
+ self.assertGreaterEqual(len(roles), 1)
+ for role in roles:
+ self.assertIn('name', role)
+ self.assertIn('scopes_permissions', role)
--- /dev/null
+# -*- coding: utf-8 -*-
+
+from __future__ import absolute_import
+
+from .helper import DashboardTestCase
+
+
+class UserTest(DashboardTestCase):
+
+ @classmethod
+ def _create_user(cls, username=None, password=None, name=None, email=None, roles=None):
+ data = {}
+ if username:
+ data['username'] = username
+ if password:
+ data['password'] = password
+ if name:
+ data['name'] = name
+ if email:
+ data['email'] = email
+ if roles:
+ data['roles'] = roles
+ cls._post("/api/user", data)
+
+ def test_crud_user(self):
+ self._create_user(username='user1',
+ password='mypassword',
+ name='My Name',
+ email='my@email.com',
+ roles=['administrator'])
+ self.assertStatus(201)
+
+ self._get('/api/user/user1')
+ self.assertStatus(200)
+ self.assertJsonBody({
+ 'username': 'user1',
+ 'name': 'My Name',
+ 'email': 'my@email.com',
+ 'roles': ['administrator']
+ })
+
+ self._put('/api/user/user1', {
+ 'name': 'My New Name',
+ 'email': 'mynew@email.com',
+ 'roles': ['block-manager'],
+ })
+ self.assertStatus(200)
+ self.assertJsonBody({
+ 'username': 'user1',
+ 'name': 'My New Name',
+ 'email': 'mynew@email.com',
+ 'roles': ['block-manager']
+ })
+
+ self._delete('/api/user/user1')
+ self.assertStatus(204)
+
+ def test_list_users(self):
+ self._get('/api/user')
+ self.assertStatus(200)
+ self.assertJsonBody([{
+ 'username': 'admin',
+ 'name': None,
+ 'email': None,
+ 'roles': ['administrator']
+ }])
+
+ def test_create_user_already_exists(self):
+ self._create_user(username='admin',
+ password='mypassword',
+ name='administrator',
+ email='my@email.com',
+ roles=['administrator'])
+ self.assertStatus(400)
+ self.assertError(code='username_already_exists',
+ component='user')
+
+ def test_create_user_no_password(self):
+ self._create_user(username='user1',
+ name='My Name',
+ email='admin@email.com',
+ roles=['administrator'])
+ self.assertStatus(400)
+ self.assertError(code='password_required',
+ component='user')
+
+ def test_create_user_invalid_role(self):
+ self._create_user(username='user1',
+ password='mypassword',
+ name='My Name',
+ email='my@email.com',
+ roles=['invalid-role'])
+ self.assertStatus(400)
+ self.assertError(code='role_does_not_exist',
+ component='user')
+
+ def test_delete_user_does_not_exist(self):
+ self._delete('/api/user/user2')
+ self.assertStatus(404)
+
+ @DashboardTestCase.RunAs('test', 'test', [{'user': ['create', 'read', 'update', 'delete']}])
+ def test_delete_current_user(self):
+ self._delete('/api/user/test')
+ self.assertStatus(400)
+ self.assertError(code='cannot_delete_current_user',
+ component='user')
+
+ def test_update_user_does_not_exist(self):
+ self._put('/api/user/user2', {'name': 'My New Name'})
+ self.assertStatus(404)
+
+ def test_update_user_invalid_role(self):
+ self._put('/api/user/admin', {'roles': ['invalid-role']})
+ self.assertStatus(400)
+ self.assertError(code='role_does_not_exist',
+ component='user')
--- /dev/null
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import
+
+from . import ApiController, RESTController
+from ..security import Scope
+from ..services.access_control import ACCESS_CTRL_DB, SYSTEM_ROLES
+
+
+@ApiController('/role', Scope.USER)
+class Role(RESTController):
+ def list(self):
+ all_roles = dict(ACCESS_CTRL_DB.roles)
+ all_roles.update(SYSTEM_ROLES)
+ items = sorted(all_roles.items(), key=lambda role: role[1].name)
+ return [r.to_dict() for _, r in items]
--- /dev/null
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import
+
+import cherrypy
+
+from . import ApiController, RESTController
+from ..exceptions import DashboardException, UserAlreadyExists, \
+ UserDoesNotExist
+from ..security import Scope
+from ..services.access_control import ACCESS_CTRL_DB, SYSTEM_ROLES
+from ..tools import Session
+
+
+@ApiController('/user', Scope.USER)
+class User(RESTController):
+ @staticmethod
+ def _user_to_dict(user):
+ result = user.to_dict()
+ del result['password']
+ return result
+
+ @staticmethod
+ def _get_user_roles(roles):
+ all_roles = dict(ACCESS_CTRL_DB.roles)
+ all_roles.update(SYSTEM_ROLES)
+ try:
+ return [all_roles[rolename] for rolename in roles]
+ except KeyError:
+ raise DashboardException(msg='Role does not exist',
+ code='role_does_not_exist',
+ component='user')
+
+ def list(self):
+ users = ACCESS_CTRL_DB.users
+ result = [User._user_to_dict(u) for _, u in users.items()]
+ return result
+
+ def get(self, username):
+ try:
+ user = ACCESS_CTRL_DB.get_user(username)
+ except UserDoesNotExist:
+ raise cherrypy.HTTPError(404)
+ return User._user_to_dict(user)
+
+ def create(self, username=None, password=None, name=None, email=None, roles=None):
+ if not username:
+ raise DashboardException(msg='Username is required',
+ code='username_required',
+ component='user')
+ if not password:
+ raise DashboardException(msg='Password is required',
+ code='password_required',
+ component='user')
+ user_roles = None
+ if roles:
+ user_roles = User._get_user_roles(roles)
+ try:
+ user = ACCESS_CTRL_DB.create_user(username, password, name, email)
+ except UserAlreadyExists:
+ raise DashboardException(msg='Username already exists',
+ code='username_already_exists',
+ component='user')
+ if user_roles:
+ user.set_roles(user_roles)
+ ACCESS_CTRL_DB.save()
+ return User._user_to_dict(user)
+
+ def delete(self, username):
+ session_username = cherrypy.session.get(Session.USERNAME)
+ if session_username == username:
+ raise DashboardException(msg='Cannot delete current user',
+ code='cannot_delete_current_user',
+ component='user')
+ try:
+ ACCESS_CTRL_DB.delete_user(username)
+ except UserDoesNotExist:
+ raise cherrypy.HTTPError(404)
+ ACCESS_CTRL_DB.save()
+
+ def set(self, username, password=None, name=None, email=None, roles=None):
+ try:
+ user = ACCESS_CTRL_DB.get_user(username)
+ except UserDoesNotExist:
+ raise cherrypy.HTTPError(404)
+ user_roles = []
+ if roles:
+ user_roles = User._get_user_roles(roles)
+ if password:
+ user.set_password(password)
+ user.name = name
+ user.email = email
+ user.set_roles(user_roles)
+ ACCESS_CTRL_DB.save()
+ return User._user_to_dict(user)
MANAGER = "manager"
LOG = "log"
GRAFANA = "grafana"
+ USER = "user"
@classmethod
def all_scopes(cls):