]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/dashboard: replace direct use of bcrypt in dashboard
authorJohn Mulligan <jmulligan@redhat.com>
Tue, 22 Apr 2025 20:31:15 +0000 (16:31 -0400)
committerJohn Mulligan <jmulligan@redhat.com>
Mon, 7 Jul 2025 13:32:24 +0000 (09:32 -0400)
Replace a direct usage of bycrypt with our cryptocaller wrapper.

Signed-off-by: John Mulligan <jmulligan@redhat.com>
src/pybind/mgr/dashboard/services/access_control.py
src/python-common/ceph/cryptotools/cryptotools.py
src/python-common/ceph/cryptotools/remote.py

index 926c35cc2dd8ae5aebb5418e93d7c13ea8a56f09..ac3aee461487c4317c6adcdb8eeca0f593df644f 100644 (file)
@@ -12,7 +12,6 @@ from datetime import datetime, timedelta
 from string import ascii_lowercase, ascii_uppercase, digits, punctuation
 from typing import List, Optional, Sequence
 
-import bcrypt
 from mgr_module import CLICheckNonemptyFileInput, CLIReadCommand, CLIWriteCommand
 from mgr_util import password_hash
 
@@ -24,6 +23,8 @@ from ..exceptions import PasswordPolicyException, PermissionNotValid, \
 from ..security import Permission, Scope
 from ..settings import Settings
 
+import ceph.cryptotools.remote
+
 logger = logging.getLogger('access_control')
 DEFAULT_FILE_DESC = 'password/secret'
 
@@ -929,7 +930,10 @@ def ac_user_set_password_hash(_, username: str, inbuf: str):
     hashed_password = inbuf
     try:
         # make sure the hashed_password is actually a bcrypt hash
-        bcrypt.checkpw(b'', hashed_password.encode('utf-8'))
+        # catch a ValueError if hashed_password is not valid.
+        cc = ceph.cryptotools.remote.CryptoCaller()
+        cc.verify_password('', hashed_password)
+
         user = mgr.ACCESS_CTRL_DB.get_user(username)
         user.set_password_hash(hashed_password)
 
index 0b2dc828b7986e1182f4ca2e387bec2c34593ebc..2610213525084861f81aa88a8ab3704afebdd57c 100644 (file)
@@ -33,6 +33,17 @@ def password_hash(args: Namespace) -> None:
     json.dump({'hash': hash_str}, sys.stdout)
 
 
+def verify_password(args: Namespace) -> None:
+    data = json.loads(sys.stdin.read())
+    password = data.encode('utf-8')
+    hashed_password = data.encode('utf-8')
+    try:
+        ok = bcrypt.checkpw(password, hashed_password)
+    except ValueError as err:
+        _fail_message(str(err))
+    json.dump({'ok': ok}, sys.stdout)
+
+
 def create_self_signed_cert(args: Namespace) -> None:
 
     # Generate private key
@@ -192,6 +203,10 @@ if __name__ == "__main__":
     parser_verify_tls = subparsers.add_parser('verify_tls')
     parser_verify_tls.set_defaults(func=verify_tls)
 
+    # password verification
+    parser_verify_password = subparsers.add_parser('verify_password')
+    parser_verify_password.set_defaults(func=verify_password)
+
     # parse the args and call whatever function was selected
     args = parser.parse_args()
     args.func(args)
index 6271288e4f8ce5c01df5ae1d27e75cebe83efe33..40e01d199128dbf05df82033bd87b0ff9c430ac6 100644 (file)
@@ -167,3 +167,18 @@ class CryptoCaller:
         if not pw_hash:
             raise CryptoCallError('no password hash')
         return pw_hash
+
+    def verify_password(self, password: str, hashed_password: str) -> bool:
+        """Verify a password matches the hashed password. Returns true if
+        password and hashed_password match.
+        """
+        pwdata = {"password": password, "hashed_password": hashed_password}
+        result = self._run(
+            ["verify_password"],
+            input_data=json.dumps(pwdata),
+            capture_output=True,
+            check=True,
+        )
+        result_obj = self._result_json(result)
+        ok = result_obj.get("ok", False)
+        return ok