Option(name='server_port', type='int', default=8080),
Option(name='ssl_server_port', type='int', default=8443),
Option(name='jwt_token_ttl', type='int', default=28800),
- Option(name='password', type='str', default=''),
Option(name='url_prefix', type='str', default=''),
- Option(name='username', type='str', default=''),
Option(name='key_file', type='str', default=''),
Option(name='crt_file', type='str', default=''),
Option(name='ssl', type='bool', default=True),
version = cls.VERSION
return "{}{}".format(cls.ACDB_CONFIG_KEY, version)
- def check_and_update_db(self):
- logger.debug("AC: Checking for previews DB versions")
- if self.VERSION == 1: # current version
- # check if there is username/password from previous version
- username = mgr.get_module_option('username', None)
- password = mgr.get_module_option('password', None)
- if username and password:
- logger.debug("AC: Found single user credentials: user=%s",
- username)
- # found user credentials
- user = self.create_user(username, "", None, None)
- # password is already hashed, so setting manually
- user.password = password
- user.add_roles([ADMIN_ROLE])
- self.save()
- else:
- raise NotImplementedError()
-
@classmethod
def load(cls):
logger.info("AC: Loading user roles DB version=%s", cls.VERSION)
if json_db is None:
logger.debug("AC: No DB v%s found, creating new...", cls.VERSION)
db = cls(cls.VERSION, {}, {})
- # check if we can update from a previous version database
- db.check_and_update_db()
return db
db = json.loads(json_db)
import errno
import json
import tempfile
-import time
import unittest
from mgr_module import ERROR_MSG_EMPTY_INPUT_FILE
})
self.validate_persistent_user('admin', ['read-only'], pass_hash,
'admin User', 'admin@user.com')
-
- def test_load_v1(self):
- self.CONFIG_KEY_DICT['accessdb_v1'] = '''
- {{
- "users": {{
- "admin": {{
- "username": "admin",
- "password":
- "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
- "roles": ["block-manager", "test_role"],
- "name": "admin User",
- "email": "admin@user.com",
- "lastUpdate": {}
- }}
- }},
- "roles": {{
- "test_role": {{
- "name": "test_role",
- "description": "Test Role",
- "scopes_permissions": {{
- "{}": ["{}", "{}"],
- "{}": ["{}"]
- }}
- }}
- }},
- "version": 1
- }}
- '''.format(int(round(time.time())), Scope.ISCSI, Permission.READ,
- Permission.UPDATE, Scope.POOL, Permission.CREATE)
-
- load_access_control_db()
- role = self.exec_cmd('ac-role-show', rolename="test_role")
- self.assertDictEqual(role, {
- 'name': 'test_role',
- 'description': "Test Role",
- 'scopes_permissions': {
- Scope.ISCSI: [Permission.READ, Permission.UPDATE],
- Scope.POOL: [Permission.CREATE]
- }
- })
- user = self.exec_cmd('ac-user-show', username="admin")
- self.assertDictEqual(user, {
- 'username': 'admin',
- 'lastUpdate': user['lastUpdate'],
- 'password':
- "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
- 'name': 'admin User',
- 'email': 'admin@user.com',
- 'roles': ['block-manager', 'test_role']
- })
-
- def test_update_from_previous_version_v1(self):
- self.CONFIG_KEY_DICT['username'] = 'admin'
- self.CONFIG_KEY_DICT['password'] = \
- '$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK'
- load_access_control_db()
- user = self.exec_cmd('ac-user-show', username="admin")
- self.assertDictEqual(user, {
- 'username': 'admin',
- 'lastUpdate': user['lastUpdate'],
- 'password':
- "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
- 'name': None,
- 'email': None,
- 'roles': ['administrator']
- })