Option(name='server_port', type='int', default=8080),
Option(name='ssl_server_port', type='int', default=8443),
Option(name='jwt_token_ttl', type='int', default=28800),
- Option(name='password', type='str', default=''),
Option(name='url_prefix', type='str', default=''),
- Option(name='username', type='str', default=''),
Option(name='key_file', type='str', default=''),
Option(name='crt_file', type='str', default=''),
Option(name='ssl', type='bool', default=True),
version = cls.VERSION
return "{}{}".format(cls.ACDB_CONFIG_KEY, version)
- def check_and_update_db(self):
- logger.debug("Checking for previews DB versions")
-
- def check_migrate_v0_to_current():
- # check if there is username/password from previous version
- username = mgr.get_module_option('username', None)
- password = mgr.get_module_option('password', None)
- if username and password:
- logger.debug("Found single user credentials: user=%s", username)
- # found user credentials
- user = self.create_user(username, "", None, None)
- # password is already hashed, so setting manually
- user.password = password
- user.add_roles([ADMIN_ROLE])
- self.save()
-
- def check_migrate_v1_to_current():
- # Check if version 1 exists in the DB and migrate it to current version
- v1_db = mgr.get_store(self.accessdb_config_key(1))
- if v1_db:
- logger.debug("Found database v1 credentials")
- v1_db = json.loads(v1_db)
-
- for user, _ in v1_db['users'].items():
- v1_db['users'][user]['enabled'] = True
- v1_db['users'][user]['pwdExpirationDate'] = None
- v1_db['users'][user]['pwdUpdateRequired'] = False
-
- self.roles = {rn: Role.from_dict(r) for rn, r in v1_db.get('roles', {}).items()}
- self.users = {un: User.from_dict(u, dict(self.roles, **SYSTEM_ROLES))
- for un, u in v1_db.get('users', {}).items()}
-
- self.save()
- else:
- # If version 1 does not exist, check if migration of VERSION "0" needs to be done
- check_migrate_v0_to_current()
-
- check_migrate_v1_to_current()
-
@classmethod
def load(cls):
logger.info("Loading user roles DB version=%s", cls.VERSION)
if json_db is None:
logger.debug("No DB v%s found, creating new...", cls.VERSION)
db = cls(cls.VERSION, {}, {})
- # check if we can update from a previous version database
- db.check_and_update_db()
return db
dict_db = json.loads(json_db)
self.validate_persistent_user('admin', ['read-only'], pass_hash,
'admin User', 'admin@user.com')
- def test_load_v1(self):
- self.CONFIG_KEY_DICT['accessdb_v1'] = '''
- {{
- "users": {{
- "admin": {{
- "username": "admin",
- "password":
- "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
- "roles": ["block-manager", "test_role"],
- "name": "admin User",
- "email": "admin@user.com",
- "lastUpdate": {}
- }}
- }},
- "roles": {{
- "test_role": {{
- "name": "test_role",
- "description": "Test Role",
- "scopes_permissions": {{
- "{}": ["{}", "{}"],
- "{}": ["{}"]
- }}
- }}
- }},
- "version": 1
- }}
- '''.format(int(round(time.time())), Scope.ISCSI, Permission.READ,
- Permission.UPDATE, Scope.POOL, Permission.CREATE)
-
- load_access_control_db()
- role = self.exec_cmd('ac-role-show', rolename="test_role")
- self.assertDictEqual(role, {
- 'name': 'test_role',
- 'description': "Test Role",
- 'scopes_permissions': {
- Scope.ISCSI: [Permission.READ, Permission.UPDATE],
- Scope.POOL: [Permission.CREATE]
- }
- })
- user = self.exec_cmd('ac-user-show', username="admin")
- self.assertDictEqual(user, {
- 'username': 'admin',
- 'lastUpdate': user['lastUpdate'],
- 'password':
- "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
- 'pwdExpirationDate': None,
- 'pwdUpdateRequired': False,
- 'name': 'admin User',
- 'email': 'admin@user.com',
- 'roles': ['block-manager', 'test_role'],
- 'enabled': True
- })
-
- def test_load_v2(self):
- self.CONFIG_KEY_DICT['accessdb_v2'] = '''
- {{
- "users": {{
- "admin": {{
- "username": "admin",
- "password":
- "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
- "pwdExpirationDate": null,
- "pwdUpdateRequired": false,
- "roles": ["block-manager", "test_role"],
- "name": "admin User",
- "email": "admin@user.com",
- "lastUpdate": {},
- "enabled": true
- }}
- }},
- "roles": {{
- "test_role": {{
- "name": "test_role",
- "description": "Test Role",
- "scopes_permissions": {{
- "{}": ["{}", "{}"],
- "{}": ["{}"]
- }}
- }}
- }},
- "version": 2
- }}
- '''.format(int(round(time.time())), Scope.ISCSI, Permission.READ,
- Permission.UPDATE, Scope.POOL, Permission.CREATE)
-
- load_access_control_db()
- role = self.exec_cmd('ac-role-show', rolename="test_role")
- self.assertDictEqual(role, {
- 'name': 'test_role',
- 'description': "Test Role",
- 'scopes_permissions': {
- Scope.ISCSI: [Permission.READ, Permission.UPDATE],
- Scope.POOL: [Permission.CREATE]
- }
- })
- user = self.exec_cmd('ac-user-show', username="admin")
- self.assertDictEqual(user, {
- 'username': 'admin',
- 'lastUpdate': user['lastUpdate'],
- 'password':
- "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
- 'pwdExpirationDate': None,
- 'pwdUpdateRequired': False,
- 'name': 'admin User',
- 'email': 'admin@user.com',
- 'roles': ['block-manager', 'test_role'],
- 'enabled': True
- })
-
- def test_update_from_previous_version_v1(self):
- self.CONFIG_KEY_DICT['username'] = 'admin'
- self.CONFIG_KEY_DICT['password'] = \
- '$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK'
- load_access_control_db()
- user = self.exec_cmd('ac-user-show', username="admin")
- self.assertDictEqual(user, {
- 'username': 'admin',
- 'lastUpdate': user['lastUpdate'],
- 'password':
- "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
- 'pwdExpirationDate': None,
- 'pwdUpdateRequired': False,
- 'name': None,
- 'email': None,
- 'roles': ['administrator'],
- 'enabled': True
- })
-
def test_password_policy_pw_length(self):
Settings.PWD_POLICY_CHECK_LENGTH_ENABLED = True
Settings.PWD_POLICY_MIN_LENGTH = 3