version = cls.VERSION
return "{}{}".format(cls.ACDB_CONFIG_KEY, version)
+ def check_and_update_db(self):
+ logger.debug("Checking for previous DB versions")
+
+ def check_migrate_v1_to_current():
+ # Check if version 1 exists in the DB and migrate it to current version
+ v1_db = mgr.get_store(self.accessdb_config_key(1))
+ if v1_db:
+ logger.debug("Found database v1 credentials")
+ v1_db = json.loads(v1_db)
+
+ for user, _ in v1_db['users'].items():
+ v1_db['users'][user]['enabled'] = True
+ v1_db['users'][user]['pwdExpirationDate'] = None
+ v1_db['users'][user]['pwdUpdateRequired'] = False
+
+ self.roles = {rn: Role.from_dict(r) for rn, r in v1_db.get('roles', {}).items()}
+ self.users = {un: User.from_dict(u, dict(self.roles, **SYSTEM_ROLES))
+ for un, u in v1_db.get('users', {}).items()}
+
+ self.save()
+
+ check_migrate_v1_to_current()
+
@classmethod
def load(cls):
logger.info("Loading user roles DB version=%s", cls.VERSION)
if json_db is None:
logger.debug("No DB v%s found, creating new...", cls.VERSION)
db = cls(cls.VERSION, {}, {})
+ # check if we can update from a previous version database
+ db.check_and_update_db()
return db
dict_db = json.loads(json_db)
self.validate_persistent_user('admin', ['read-only'], pass_hash,
'admin User', 'admin@user.com')
+ def test_load_v1(self):
+ self.CONFIG_KEY_DICT['accessdb_v1'] = '''
+ {{
+ "users": {{
+ "admin": {{
+ "username": "admin",
+ "password":
+ "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
+ "roles": ["block-manager", "test_role"],
+ "name": "admin User",
+ "email": "admin@user.com",
+ "lastUpdate": {}
+ }}
+ }},
+ "roles": {{
+ "test_role": {{
+ "name": "test_role",
+ "description": "Test Role",
+ "scopes_permissions": {{
+ "{}": ["{}", "{}"],
+ "{}": ["{}"]
+ }}
+ }}
+ }},
+ "version": 1
+ }}
+ '''.format(int(round(time.time())), Scope.ISCSI, Permission.READ,
+ Permission.UPDATE, Scope.POOL, Permission.CREATE)
+
+ load_access_control_db()
+ role = self.exec_cmd('ac-role-show', rolename="test_role")
+ self.assertDictEqual(role, {
+ 'name': 'test_role',
+ 'description': "Test Role",
+ 'scopes_permissions': {
+ Scope.ISCSI: [Permission.READ, Permission.UPDATE],
+ Scope.POOL: [Permission.CREATE]
+ }
+ })
+ user = self.exec_cmd('ac-user-show', username="admin")
+ self.assertDictEqual(user, {
+ 'username': 'admin',
+ 'lastUpdate': user['lastUpdate'],
+ 'password':
+ "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
+ 'pwdExpirationDate': None,
+ 'pwdUpdateRequired': False,
+ 'name': 'admin User',
+ 'email': 'admin@user.com',
+ 'roles': ['block-manager', 'test_role'],
+ 'enabled': True
+ })
+
+ def test_load_v2(self):
+ self.CONFIG_KEY_DICT['accessdb_v2'] = '''
+ {{
+ "users": {{
+ "admin": {{
+ "username": "admin",
+ "password":
+ "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
+ "pwdExpirationDate": null,
+ "pwdUpdateRequired": false,
+ "roles": ["block-manager", "test_role"],
+ "name": "admin User",
+ "email": "admin@user.com",
+ "lastUpdate": {},
+ "enabled": true
+ }}
+ }},
+ "roles": {{
+ "test_role": {{
+ "name": "test_role",
+ "description": "Test Role",
+ "scopes_permissions": {{
+ "{}": ["{}", "{}"],
+ "{}": ["{}"]
+ }}
+ }}
+ }},
+ "version": 2
+ }}
+ '''.format(int(round(time.time())), Scope.ISCSI, Permission.READ,
+ Permission.UPDATE, Scope.POOL, Permission.CREATE)
+
+ load_access_control_db()
+ role = self.exec_cmd('ac-role-show', rolename="test_role")
+ self.assertDictEqual(role, {
+ 'name': 'test_role',
+ 'description': "Test Role",
+ 'scopes_permissions': {
+ Scope.ISCSI: [Permission.READ, Permission.UPDATE],
+ Scope.POOL: [Permission.CREATE]
+ }
+ })
+ user = self.exec_cmd('ac-user-show', username="admin")
+ self.assertDictEqual(user, {
+ 'username': 'admin',
+ 'lastUpdate': user['lastUpdate'],
+ 'password':
+ "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
+ 'pwdExpirationDate': None,
+ 'pwdUpdateRequired': False,
+ 'name': 'admin User',
+ 'email': 'admin@user.com',
+ 'roles': ['block-manager', 'test_role'],
+ 'enabled': True
+ })
+
def test_password_policy_pw_length(self):
Settings.PWD_POLICY_CHECK_LENGTH_ENABLED = True
Settings.PWD_POLICY_MIN_LENGTH = 3