]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/dashboard: User database migration has been cut out 42142/head
authorVolker Theile <vtheile@suse.com>
Wed, 30 Jun 2021 12:00:28 +0000 (14:00 +0200)
committerErnesto Puerta <epuertat@redhat.com>
Thu, 1 Jul 2021 16:23:35 +0000 (18:23 +0200)
This PR will revert changes done by https://tracker.ceph.com/issues/49645 to auto-migrate user database v1 to v2.

Fixes: https://tracker.ceph.com/issues/51443
Signed-off-by: Volker Theile <vtheile@suse.com>
(cherry picked from commit 82922811612813840a684af993ca7c95a0b48d53)

src/pybind/mgr/dashboard/services/access_control.py
src/pybind/mgr/dashboard/tests/test_access_control.py

index 3cc9d62205a103783989abc2681e96847c46cdd8..5d573a3d6945cf3f1adacc3df7c50036f6ba931f 100644 (file)
@@ -517,6 +517,29 @@ class AccessControlDB(object):
             version = cls.VERSION
         return "{}{}".format(cls.ACDB_CONFIG_KEY, version)
 
+    def check_and_update_db(self):
+        logger.debug("Checking for previous DB versions")
+
+        def check_migrate_v1_to_current():
+            # Check if version 1 exists in the DB and migrate it to current version
+            v1_db = mgr.get_store(self.accessdb_config_key(1))
+            if v1_db:
+                logger.debug("Found database v1 credentials")
+                v1_db = json.loads(v1_db)
+
+                for user, _ in v1_db['users'].items():
+                    v1_db['users'][user]['enabled'] = True
+                    v1_db['users'][user]['pwdExpirationDate'] = None
+                    v1_db['users'][user]['pwdUpdateRequired'] = False
+
+                self.roles = {rn: Role.from_dict(r) for rn, r in v1_db.get('roles', {}).items()}
+                self.users = {un: User.from_dict(u, dict(self.roles, **SYSTEM_ROLES))
+                              for un, u in v1_db.get('users', {}).items()}
+
+                self.save()
+
+        check_migrate_v1_to_current()
+
     @classmethod
     def load(cls):
         logger.info("Loading user roles DB version=%s", cls.VERSION)
@@ -525,6 +548,8 @@ class AccessControlDB(object):
         if json_db is None:
             logger.debug("No DB v%s found, creating new...", cls.VERSION)
             db = cls(cls.VERSION, {}, {})
+            # check if we can update from a previous version database
+            db.check_and_update_db()
             return db
 
         dict_db = json.loads(json_db)
index d384c4765b7b724c45371714edfbfea824eb3c36..c362d7b9b413f50869bc2c229d819f1c6f4f9789 100644 (file)
@@ -689,6 +689,115 @@ class AccessControlTest(unittest.TestCase, CLICommandTestMixin):
         self.validate_persistent_user('admin', ['read-only'], pass_hash,
                                       'admin User', 'admin@user.com')
 
+    def test_load_v1(self):
+        self.CONFIG_KEY_DICT['accessdb_v1'] = '''
+            {{
+                "users": {{
+                    "admin": {{
+                        "username": "admin",
+                        "password":
+                "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
+                        "roles": ["block-manager", "test_role"],
+                        "name": "admin User",
+                        "email": "admin@user.com",
+                        "lastUpdate": {}
+                    }}
+                }},
+                "roles": {{
+                    "test_role": {{
+                        "name": "test_role",
+                        "description": "Test Role",
+                        "scopes_permissions": {{
+                            "{}": ["{}", "{}"],
+                            "{}": ["{}"]
+                        }}
+                    }}
+                }},
+                "version": 1
+            }}
+        '''.format(int(round(time.time())), Scope.ISCSI, Permission.READ,
+                   Permission.UPDATE, Scope.POOL, Permission.CREATE)
+
+        load_access_control_db()
+        role = self.exec_cmd('ac-role-show', rolename="test_role")
+        self.assertDictEqual(role, {
+            'name': 'test_role',
+            'description': "Test Role",
+            'scopes_permissions': {
+                Scope.ISCSI: [Permission.READ, Permission.UPDATE],
+                Scope.POOL: [Permission.CREATE]
+            }
+        })
+        user = self.exec_cmd('ac-user-show', username="admin")
+        self.assertDictEqual(user, {
+            'username': 'admin',
+            'lastUpdate': user['lastUpdate'],
+            'password':
+                "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
+            'pwdExpirationDate': None,
+            'pwdUpdateRequired': False,
+            'name': 'admin User',
+            'email': 'admin@user.com',
+            'roles': ['block-manager', 'test_role'],
+            'enabled': True
+        })
+
+    def test_load_v2(self):
+        self.CONFIG_KEY_DICT['accessdb_v2'] = '''
+            {{
+                "users": {{
+                    "admin": {{
+                        "username": "admin",
+                        "password":
+                "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
+                        "pwdExpirationDate": null,
+                        "pwdUpdateRequired": false,
+                        "roles": ["block-manager", "test_role"],
+                        "name": "admin User",
+                        "email": "admin@user.com",
+                        "lastUpdate": {},
+                        "enabled": true
+                    }}
+                }},
+                "roles": {{
+                    "test_role": {{
+                        "name": "test_role",
+                        "description": "Test Role",
+                        "scopes_permissions": {{
+                            "{}": ["{}", "{}"],
+                            "{}": ["{}"]
+                        }}
+                    }}
+                }},
+                "version": 2
+            }}
+        '''.format(int(round(time.time())), Scope.ISCSI, Permission.READ,
+                   Permission.UPDATE, Scope.POOL, Permission.CREATE)
+
+        load_access_control_db()
+        role = self.exec_cmd('ac-role-show', rolename="test_role")
+        self.assertDictEqual(role, {
+            'name': 'test_role',
+            'description': "Test Role",
+            'scopes_permissions': {
+                Scope.ISCSI: [Permission.READ, Permission.UPDATE],
+                Scope.POOL: [Permission.CREATE]
+            }
+        })
+        user = self.exec_cmd('ac-user-show', username="admin")
+        self.assertDictEqual(user, {
+            'username': 'admin',
+            'lastUpdate': user['lastUpdate'],
+            'password':
+                "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
+            'pwdExpirationDate': None,
+            'pwdUpdateRequired': False,
+            'name': 'admin User',
+            'email': 'admin@user.com',
+            'roles': ['block-manager', 'test_role'],
+            'enabled': True
+        })
+
     def test_password_policy_pw_length(self):
         Settings.PWD_POLICY_CHECK_LENGTH_ENABLED = True
         Settings.PWD_POLICY_MIN_LENGTH = 3