- *Create User*::
- $ ceph dashboard ac-user-create <username> [<password>] [<rolename>] [<name>] [<email>]
+ $ ceph dashboard ac-user-create <username> [<password>] [<rolename>] [<name>] [<email>] [<enabled>]
- *Delete User*::
$ ceph dashboard ac-user-set-info <username> <name> <email>
+- *Disable User*::
+
+ $ ceph dashboard ac-user-disable <username>
+
+- *Enable User*::
+
+ $ ceph dashboard ac-user-enable <username>
User Roles and Permissions
^^^^^^^^^^^^^^^^^^^^^^^^^^
class UserTest(DashboardTestCase):
@classmethod
- def _create_user(cls, username=None, password=None, name=None, email=None, roles=None):
+ def _create_user(cls, username=None, password=None, name=None, email=None, roles=None, enabled=True):
data = {}
if username:
data['username'] = username
data['email'] = email
if roles:
data['roles'] = roles
+ data['enabled'] = enabled
cls._post("/api/user", data)
def test_crud_user(self):
'name': 'My Name',
'email': 'my@email.com',
'roles': ['administrator'],
- 'lastUpdate': user['lastUpdate']
+ 'lastUpdate': user['lastUpdate'],
+ 'enabled': True
})
self._put('/api/user/user1', {
'name': 'My New Name',
'email': 'mynew@email.com',
'roles': ['block-manager'],
- 'lastUpdate': user['lastUpdate']
+ 'lastUpdate': user['lastUpdate'],
+ 'enabled': True
})
self._delete('/api/user/user1')
'name': None,
'email': None,
'roles': ['administrator'],
- 'lastUpdate': user['lastUpdate']
+ 'lastUpdate': user['lastUpdate'],
+ 'enabled': True
}])
def test_create_user_already_exists(self):
self.assertError(code='cannot_delete_current_user',
component='user')
+ @DashboardTestCase.RunAs('test', 'test', [{'user': ['create', 'read', 'update', 'delete']}])
+ def test_disable_current_user(self):
+ self._put('/api/user/test', {'enabled': False})
+ self.assertStatus(400)
+ self.assertError(code='cannot_disable_current_user',
+ component='user')
+
def test_update_user_does_not_exist(self):
self._put('/api/user/user2', {'name': 'My New Name'})
self.assertStatus(404)
raise cherrypy.HTTPError(404)
return User._user_to_dict(user)
- def create(self, username=None, password=None, name=None, email=None, roles=None):
+ def create(self, username=None, password=None, name=None, email=None,
+ roles=None, enabled=True):
if not username:
raise DashboardException(msg='Username is required',
code='username_required',
if roles:
user_roles = User._get_user_roles(roles)
try:
- user = mgr.ACCESS_CTRL_DB.create_user(username, password, name, email)
+ user = mgr.ACCESS_CTRL_DB.create_user(username, password, name,
+ email, enabled)
except UserAlreadyExists:
raise DashboardException(msg='Username already exists',
code='username_already_exists',
raise cherrypy.HTTPError(404)
mgr.ACCESS_CTRL_DB.save()
- def set(self, username, password=None, name=None, email=None, roles=None):
+ def set(self, username, password=None, name=None, email=None, roles=None,
+ enabled=None):
+ if JwtManager.get_username() == username and enabled is False:
+ raise DashboardException(msg='You are not allowed to disable your user',
+ code='cannot_disable_current_user',
+ component='user')
+
try:
user = mgr.ACCESS_CTRL_DB.get_user(username)
except UserDoesNotExist:
user.set_password(password)
user.name = name
user.email = email
+ if enabled is not None:
+ user.enabled = enabled
user.set_roles(user_roles)
mgr.ACCESS_CTRL_DB.save()
return User._user_to_dict(user)
class User(object):
def __init__(self, username, password, name=None, email=None, roles=None,
- lastUpdate=None):
+ lastUpdate=None, enabled=True):
self.username = username
self.password = password
self.name = name
self.refreshLastUpdate()
else:
self.lastUpdate = lastUpdate
+ self._enabled = enabled
def refreshLastUpdate(self):
self.lastUpdate = int(time.time())
+ @property
+ def enabled(self):
+ return self._enabled
+
+ @enabled.setter
+ def enabled(self, value):
+ self._enabled = value
+ self.refreshLastUpdate()
+
def set_password(self, password):
self.set_password_hash(password_hash(password))
'roles': sorted([r.name for r in self.roles]),
'name': self.name,
'email': self.email,
- 'lastUpdate': self.lastUpdate
+ 'lastUpdate': self.lastUpdate,
+ 'enabled': self.enabled
}
@classmethod
class AccessControlDB(object):
- VERSION = 1
+ VERSION = 2
ACDB_CONFIG_KEY = "accessdb_v"
def __init__(self, version, users, roles):
del self.roles[name]
- def create_user(self, username, password, name, email):
+ def create_user(self, username, password, name, email, enabled=True):
logger.debug("AC: creating user: username=%s", username)
with self.lock:
if username in self.users:
raise UserAlreadyExists(username)
- user = User(username, password_hash(password), name, email)
+ user = User(username, password_hash(password), name, email, enabled=enabled)
self.users[username] = user
return user
def check_and_update_db(self):
logger.debug("AC: Checking for previews DB versions")
- if self.VERSION == 1: # current version
+
+ def check_migrate_v0_to_current():
# check if there is username/password from previous version
username = mgr.get_module_option('username', None)
password = mgr.get_module_option('password', None)
if username and password:
- logger.debug("AC: Found single user credentials: user=%s",
- username)
+ logger.debug("AC: Found single user credentials: user=%s", username)
# found user credentials
user = self.create_user(username, "", None, None)
# password is already hashed, so setting manually
user.password = password
user.add_roles([ADMIN_ROLE])
self.save()
+
+ def check_migrate_v1_to_current():
+ # Check if version 1 exists in the DB and migrate it to current version
+ v1_db = mgr.get_store(self.accessdb_config_key(1))
+ if v1_db:
+ logger.debug("AC: Found database v1 credentials")
+ v1_db = json.loads(v1_db)
+
+ for user, _ in v1_db['users'].items():
+ v1_db['users'][user]['enabled'] = True
+
+ self.roles = {rn: Role.from_dict(r) for rn, r in v1_db.get('roles', {}).items()}
+ self.users = {un: User.from_dict(u, dict(self.roles, **SYSTEM_ROLES))
+ for un, u in v1_db.get('users', {}).items()}
+
+ self.save()
+ else:
+ # If version 1 does not exist, check if migration of VERSION "0" needs to be done
+ check_migrate_v0_to_current()
+
+ if self.VERSION == 1: # current version
+ check_migrate_v0_to_current()
+
+ elif self.VERSION == 2: # current version
+ check_migrate_v1_to_current()
+
else:
raise NotImplementedError()
'name=password,type=CephString,req=false '
'name=rolename,type=CephString,req=false '
'name=name,type=CephString,req=false '
- 'name=email,type=CephString,req=false',
+ 'name=email,type=CephString,req=false '
+ 'name=enabled,type=CephBool,req=false',
'Create a user')
def ac_user_create_cmd(_, username, password=None, rolename=None, name=None,
- email=None):
+ email=None, enabled=True):
try:
role = mgr.ACCESS_CTRL_DB.get_role(rolename) if rolename else None
except RoleDoesNotExist as ex:
role = SYSTEM_ROLES[rolename]
try:
- user = mgr.ACCESS_CTRL_DB.create_user(username, password, name, email)
+ user = mgr.ACCESS_CTRL_DB.create_user(username, password, name, email, enabled)
except UserAlreadyExists as ex:
return -errno.EEXIST, '', str(ex)
return 0, json.dumps(user.to_dict()), ''
+@CLIWriteCommand('dashboard ac-user-enable',
+ 'name=username,type=CephString',
+ 'Enable a user')
+def ac_user_enable(_, username):
+ try:
+ user = mgr.ACCESS_CTRL_DB.get_user(username)
+ user.enabled = True
+
+ mgr.ACCESS_CTRL_DB.save()
+ return 0, json.dumps(user.to_dict()), ''
+ except UserDoesNotExist as ex:
+ return -errno.ENOENT, '', str(ex)
+
+
+@CLIWriteCommand('dashboard ac-user-disable',
+ 'name=username,type=CephString',
+ 'Disable a user')
+def ac_user_disable(_, username):
+ try:
+ user = mgr.ACCESS_CTRL_DB.get_user(username)
+ user.enabled = False
+
+ mgr.ACCESS_CTRL_DB.save()
+ return 0, json.dumps(user.to_dict()), ''
+ except UserDoesNotExist as ex:
+ return -errno.ENOENT, '', str(ex)
+
+
@CLIWriteCommand('dashboard ac-user-delete',
'name=username,type=CephString',
'Delete user')
try:
user = mgr.ACCESS_CTRL_DB.get_user(username)
if user.password:
- if user.compare_password(password):
+ if user.enabled and user.compare_password(password):
return user.permissions_dict()
except UserDoesNotExist:
logger.debug("User '%s' does not exist", username)
self.assertNotIn(rolename, db['roles'])
def validate_persistent_user(self, username, roles, password=None,
- name=None, email=None, lastUpdate=None):
+ name=None, email=None, lastUpdate=None,
+ enabled=True):
db = self.load_persistent_db()
self.assertIn('users', db)
self.assertIn(username, db['users'])
self.assertEqual(db['users'][username]['email'], email)
if lastUpdate:
self.assertEqual(db['users'][username]['lastUpdate'], lastUpdate)
+ self.assertEqual(db['users'][username]['enabled'], enabled)
def validate_persistent_no_user(self, username):
db = self.load_persistent_db()
'lastUpdate': user['lastUpdate'],
'name': '{} User'.format(username),
'email': '{}@user.com'.format(username),
- 'roles': [rolename] if rolename else []
+ 'roles': [rolename] if rolename else [],
+ 'enabled': True
})
self.validate_persistent_user(username, [rolename] if rolename else [],
pass_hash, '{} User'.format(username),
'{}@user.com'.format(username),
- user['lastUpdate'])
+ user['lastUpdate'], True)
return user
def test_create_user_with_role(self):
'password': pass_hash,
'name': 'admin User',
'email': 'admin@user.com',
- 'roles': ['block-manager', 'pool-manager']
+ 'roles': ['block-manager', 'pool-manager'],
+ 'enabled': True
})
def test_show_nonexistent_user(self):
'name': 'Admin Name',
'email': 'admin@admin.com',
'lastUpdate': user['lastUpdate'],
- 'roles': []
+ 'roles': [],
+ 'enabled': True
})
self.validate_persistent_user('admin', [], pass_hash, 'Admin Name',
'admin@admin.com')
'name': 'admin User',
'email': 'admin@user.com',
'lastUpdate': user['lastUpdate'],
- 'roles': []
+ 'roles': [],
+ 'enabled': True
})
self.validate_persistent_user('admin', [], pass_hash, 'admin User',
'admin@user.com')
'name': None,
'email': None,
'lastUpdate': user['lastUpdate'],
- 'roles': ['administrator']
+ 'roles': ['administrator'],
+ 'enabled': True,
})
self.validate_persistent_user('admin', ['administrator'], pass_hash,
None, None)
'name': 'admin User',
'email': 'admin@user.com',
'lastUpdate': user['lastUpdate'],
- 'roles': ['read-only']
+ 'roles': ['read-only'],
+ 'enabled': True
})
self.validate_persistent_user('admin', ['read-only'], pass_hash,
'admin User', 'admin@user.com')
"$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
'name': 'admin User',
'email': 'admin@user.com',
- 'roles': ['block-manager', 'test_role']
+ 'roles': ['block-manager', 'test_role'],
+ 'enabled': True
+ })
+
+ def test_load_v2(self):
+ """
+ The `enabled` attribute of a user has been added in v2
+ """
+ self.CONFIG_KEY_DICT['accessdb_v1'] = '''
+ {{
+ "users": {{
+ "admin": {{
+ "username": "admin",
+ "password":
+ "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
+ "roles": ["block-manager", "test_role"],
+ "name": "admin User",
+ "email": "admin@user.com",
+ "lastUpdate": {},
+ "enabled": true
+ }}
+ }},
+ "roles": {{
+ "test_role": {{
+ "name": "test_role",
+ "description": "Test Role",
+ "scopes_permissions": {{
+ "{}": ["{}", "{}"],
+ "{}": ["{}"]
+ }}
+ }}
+ }},
+ "version": 1
+ }}
+ '''.format(int(round(time.time())), Scope.ISCSI, Permission.READ,
+ Permission.UPDATE, Scope.POOL, Permission.CREATE)
+
+ load_access_control_db()
+ role = self.exec_cmd('ac-role-show', rolename="test_role")
+ self.assertDictEqual(role, {
+ 'name': 'test_role',
+ 'description': "Test Role",
+ 'scopes_permissions': {
+ Scope.ISCSI: [Permission.READ, Permission.UPDATE],
+ Scope.POOL: [Permission.CREATE]
+ }
+ })
+ user = self.exec_cmd('ac-user-show', username="admin")
+ self.assertDictEqual(user, {
+ 'username': 'admin',
+ 'lastUpdate': user['lastUpdate'],
+ 'password':
+ "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
+ 'name': 'admin User',
+ 'email': 'admin@user.com',
+ 'roles': ['block-manager', 'test_role'],
+ 'enabled': True
})
def test_update_from_previous_version_v1(self):
"$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
'name': None,
'email': None,
- 'roles': ['administrator']
+ 'roles': ['administrator'],
+ 'enabled': True
})