]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: fixing nvmeof section in cert_mgr UT + new UT
authorRedouane Kachach <rkachach@ibm.com>
Thu, 7 Aug 2025 13:57:41 +0000 (15:57 +0200)
committerRedouane Kachach <rkachach@ibm.com>
Sat, 6 Sep 2025 21:39:46 +0000 (23:39 +0200)
Signed-off-by: Redouane Kachach <rkachach@ibm.com>
src/pybind/mgr/cephadm/tests/test_certmgr.py

index d71d769f4398355f517d59e1e724e579a4957ccf..32bca8f21133333b4d5fbc36c49ae694372d4336 100644 (file)
@@ -5,7 +5,7 @@ import json
 from tests import mock
 import logging
 
-from cephadm.tlsobject_types import Cert, PrivKey, TLSObjectException, TLSObjectProtocol
+from cephadm.tlsobject_types import Cert, PrivKey, TLSObjectException, TLSObjectProtocol, CertKeyPair
 from cephadm.tlsobject_store import TLSOBJECT_STORE_PREFIX, TLSObjectStore, TLSObjectScope
 from cephadm.module import CephadmOrchestrator
 from cephadm.cert_mgr import CertInfo, CertMgr
@@ -301,12 +301,12 @@ class TestCertMgr(object):
 
         rgw_frontend_rgw_foo_host2_cert = 'fake-rgw-cert'
         nvmeof_client_cert = 'fake-nvmeof-client-cert'
-        nvmeof_server_cert = 'fake-nvmeof-server-cert'
+        nvmeof_ssl_cert = 'fake-nvmeof-ssl-cert'
         nvmeof_root_ca_cert = 'fake-nvmeof-root-ca-cert'
         grafana_cert_host_1 = 'grafana-cert-host-1'
         grafana_cert_host_2 = 'grafana-cert-host-2'
         cephadm_module.cert_mgr.save_cert('rgw_ssl_cert', rgw_frontend_rgw_foo_host2_cert, service_name='rgw.foo', user_made=True)
-        cephadm_module.cert_mgr.save_cert('nvmeof_server_cert', nvmeof_server_cert, service_name='nvmeof.foo', user_made=True)
+        cephadm_module.cert_mgr.save_cert('nvmeof_ssl_cert', nvmeof_ssl_cert, service_name='nvmeof.self-signed.foo', user_made=False)
         cephadm_module.cert_mgr.save_cert('nvmeof_client_cert', nvmeof_client_cert, service_name='nvmeof.foo', user_made=True)
         cephadm_module.cert_mgr.save_cert('nvmeof_root_ca_cert', nvmeof_root_ca_cert, service_name='nvmeof.foo', user_made=True)
         cephadm_module.cert_mgr.save_cert('grafana_ssl_cert', grafana_cert_host_1, host='host-1', user_made=True)
@@ -314,7 +314,7 @@ class TestCertMgr(object):
 
         expected_calls = [
             mock.call(f'{TLSOBJECT_STORE_CERT_PREFIX}rgw_ssl_cert', json.dumps({'rgw.foo': Cert(rgw_frontend_rgw_foo_host2_cert, True).to_json()})),
-            mock.call(f'{TLSOBJECT_STORE_CERT_PREFIX}nvmeof_server_cert', json.dumps({'nvmeof.foo': Cert(nvmeof_server_cert, True).to_json()})),
+            mock.call(f'{TLSOBJECT_STORE_CERT_PREFIX}nvmeof_ssl_cert', json.dumps({'nvmeof.self-signed.foo': Cert(nvmeof_ssl_cert, False).to_json()})),
             mock.call(f'{TLSOBJECT_STORE_CERT_PREFIX}nvmeof_client_cert', json.dumps({'nvmeof.foo': Cert(nvmeof_client_cert, True).to_json()})),
             mock.call(f'{TLSOBJECT_STORE_CERT_PREFIX}nvmeof_root_ca_cert', json.dumps({'nvmeof.foo': Cert(nvmeof_root_ca_cert, True).to_json()})),
             mock.call(f'{TLSOBJECT_STORE_CERT_PREFIX}grafana_ssl_cert', json.dumps({'host-1': Cert(grafana_cert_host_1, True).to_json()})),
@@ -365,7 +365,7 @@ class TestCertMgr(object):
             }
 
         def compare_certls_dicts(expected_ls):
-            actual_ls = cephadm_module.cert_mgr.cert_ls(include_datails=True)
+            actual_ls = cephadm_module.cert_mgr.cert_ls(include_details=True, include_cephadm_signed=True)
             assert actual_ls.keys() == expected_ls.keys()
             for svc_cert_name, value in expected_ls.items():
                 expected_certs_entry = value['certificates']
@@ -450,8 +450,8 @@ class TestCertMgr(object):
 
         # nvmeof certificates
         cephadm_module.cert_mgr.save_cert('nvmeof_client_cert', CEPHADM_SELF_GENERATED_CERT_1, service_name='nvmeof.foo', user_made=True)
-        cephadm_module.cert_mgr.save_cert('nvmeof_server_cert', CEPHADM_SELF_GENERATED_CERT_1, service_name='nvmeof.foo', user_made=True)
         cephadm_module.cert_mgr.save_cert('nvmeof_root_ca_cert', CEPHADM_SELF_GENERATED_CERT_2, service_name='nvmeof.foo', user_made=True)
+        cephadm_module.cert_mgr.save_cert('nvmeof_ssl_cert', CEPHADM_SELF_GENERATED_CERT_1, service_name='nvmeof.self-signed.foo', user_made=False)
         expected_ls.update(
             {
                 "nvmeof_client_cert": {
@@ -460,53 +460,95 @@ class TestCertMgr(object):
                         "nvmeof.foo": get_generated_cephadm_cert_info_1(),
                     },
                 },
-                "nvmeof_server_cert": {
+                "nvmeof_root_ca_cert": {
                     "scope": "service",
                     "certificates": {
-                        "nvmeof.foo": get_generated_cephadm_cert_info_1(),
+                        "nvmeof.foo": get_generated_cephadm_cert_info_2(),
                     },
                 },
-                "nvmeof_root_ca_cert": {
+                "nvmeof_ssl_cert": {
                     "scope": "service",
                     "certificates": {
-                        "nvmeof.foo": get_generated_cephadm_cert_info_2(),
+                        "nvmeof.self-signed.foo": get_generated_cephadm_cert_info_1(),
                     },
                 },
             }
         )
         compare_certls_dicts(expected_ls)
 
+    def test_cephadm_signed_with_label_host_scope(self, cephadm_module):
+        """
+        Ensure cephadm-signed <service>__<label> names work end-to-end
+        (stored under HOST scope automatically, retrievable, and visible in cert_ls).
+        """
+        cm: CertMgr = cephadm_module.cert_mgr
+
+        svc = "mgmt-gateway"
+        cert_label = "internal"
+        host = "host-1"
+
+        # Register the self-signed pair for this (service,label)
+        cm.register_self_signed_cert_key_pair(svc, label=cert_label)
+
+        # Save (simulate cephadm-generated) cert/key at host target
+        cm.save_self_signed_cert_key_pair(
+            svc,
+            CertKeyPair(CEPHADM_SELF_GENERATED_CERT_1, CEPHADM_SELF_GENERATED_KEY_2048),
+            host=host,
+            label=cert_label,
+        )
+
+        cert_name = cm.self_signed_cert(svc, cert_label)
+        key_name = cm.self_signed_key(svc, cert_label)
+
+        # Stored under HOST scope and retrievable by (host)
+        assert cm.cert_exists(cert_name, host=host) is True
+        assert cm.get_cert(cert_name, host=host) == CEPHADM_SELF_GENERATED_CERT_1
+        assert cm.get_key(key_name, host=host) == CEPHADM_SELF_GENERATED_KEY_2048
+
+        # Scope detection for cephadm-signed objects should be HOST
+        assert cm.get_cert_scope(cert_name) == TLSObjectScope.HOST
+        assert cm.get_key_scope(key_name) == TLSObjectScope.HOST
+
+        # Parsing back the service name from the cert name should ignore the label
+        assert cm.service_name_from_cert(cert_name) == svc
+
+        # Verify listing includes the cephadm-signed entry when requested
+        ls = cm.cert_ls(include_details=True, include_cephadm_signed=True)
+        assert cert_name in ls
+        assert ls[cert_name]["scope"] == "host"
+        assert host in ls[cert_name]["certificates"]
+        # sanity on validity fields (already exercised elsewhere; just presence here)
+        assert "validity" in ls[cert_name]["certificates"][host]
+
     @mock.patch("cephadm.module.CephadmOrchestrator.set_store")
     def test_tlsobject_store_save_key(self, _set_store, cephadm_module: CephadmOrchestrator):
 
         grafana_host1_key = 'fake-grafana-host1-key'
         grafana_host2_key = 'fake-grafana-host2-key'
         nvmeof_client_key = 'nvmeof-client-key'
-        nvmeof_server_key = 'nvmeof-server-key'
-        nvmeof_encryption_key = 'nvmeof-encryption-key'
+        nvmeof_ssl_key = 'nvmeof-ssl-key'
         cephadm_module.cert_mgr.save_key('grafana_ssl_key', grafana_host1_key, host='host1')
         cephadm_module.cert_mgr.save_key('grafana_ssl_key', grafana_host2_key, host='host2')
         cephadm_module.cert_mgr.save_key('nvmeof_client_key', nvmeof_client_key, service_name='nvmeof.foo')
-        cephadm_module.cert_mgr.save_key('nvmeof_server_key', nvmeof_server_key, service_name='nvmeof.foo')
-        cephadm_module.cert_mgr.save_key('nvmeof_encryption_key', nvmeof_encryption_key, service_name='nvmeof.foo')
+        cephadm_module.cert_mgr.save_key('nvmeof_ssl_key', nvmeof_ssl_key, service_name='nvmeof.self-signed.foo')
 
         expected_calls = [
             mock.call(f'{TLSOBJECT_STORE_KEY_PREFIX}grafana_ssl_key', json.dumps({'host1': PrivKey(grafana_host1_key).to_json()})),
             mock.call(f'{TLSOBJECT_STORE_KEY_PREFIX}grafana_ssl_key', json.dumps({'host1': PrivKey(grafana_host1_key).to_json(),
                                                                                   'host2': PrivKey(grafana_host2_key).to_json()})),
             mock.call(f'{TLSOBJECT_STORE_KEY_PREFIX}nvmeof_client_key', json.dumps({'nvmeof.foo': PrivKey(nvmeof_client_key).to_json()})),
-            mock.call(f'{TLSOBJECT_STORE_KEY_PREFIX}nvmeof_server_key', json.dumps({'nvmeof.foo': PrivKey(nvmeof_server_key).to_json()})),
-            mock.call(f'{TLSOBJECT_STORE_KEY_PREFIX}nvmeof_encryption_key', json.dumps({'nvmeof.foo': PrivKey(nvmeof_encryption_key).to_json()})),
+            mock.call(f'{TLSOBJECT_STORE_KEY_PREFIX}nvmeof_ssl_key', json.dumps({'nvmeof.self-signed.foo': PrivKey(nvmeof_ssl_key).to_json()})),
         ]
         _set_store.assert_has_calls(expected_calls)
 
     @mock.patch("cephadm.module.CephadmOrchestrator.set_store")
     def test_tlsobject_store_key_ls(self, _set_store, cephadm_module: CephadmOrchestrator):
         expected_ls = {
-            'nvmeof_server_key': {
+            'nvmeof_ssl_key': {
                 'scope': 'service',
                 'keys': {
-                    'nvmeof.foo': {
+                    'nvmeof.self-signed.foo': {
                         'key_type': 'RSA',
                         'key_size': 4096
                     }
@@ -520,21 +562,11 @@ class TestCertMgr(object):
                         'key_size': 4096
                     }
                 }
-            },
-            'nvmeof_encryption_key': {
-                'scope': 'service',
-                'keys': {
-                    'nvmeof.foo': {
-                        'key_type': 'RSA',
-                        'key_size': 2048
-                    }
-                }
             }
         }
 
         cephadm_module.cert_mgr.save_key('nvmeof_client_key', CEPHADM_SELF_GENERATED_KEY_4096, service_name='nvmeof.foo')
-        cephadm_module.cert_mgr.save_key('nvmeof_server_key', CEPHADM_SELF_GENERATED_KEY_4096, service_name='nvmeof.foo')
-        cephadm_module.cert_mgr.save_key('nvmeof_encryption_key', CEPHADM_SELF_GENERATED_KEY_2048, service_name='nvmeof.foo')
+        cephadm_module.cert_mgr.save_key('nvmeof_ssl_key', CEPHADM_SELF_GENERATED_KEY_4096, service_name='nvmeof.self-signed.foo')
         assert cephadm_module.cert_mgr.key_ls() == expected_ls
 
         cephadm_module.cert_mgr.save_key('ingress_ssl_key', 'invalid_key', service_name='ingress.foo')
@@ -546,7 +578,7 @@ class TestCertMgr(object):
         # Define certs and keys with their corresponding scopes
         certs = {
             'rgw_ssl_cert': ('rgw.foo', 'fake-rgw-cert', TLSObjectScope.SERVICE),
-            'nvmeof_server_cert': ('nvmeof.foo', 'nvmeof-server-cert', TLSObjectScope.SERVICE),
+            'nvmeof_ssl_cert': ('nvmeof.self-signed.foo', 'nvmeof-ssl-cert', TLSObjectScope.SERVICE),
             'nvmeof_client_cert': ('nvmeof.foo', 'nvmeof-client-cert', TLSObjectScope.SERVICE),
             'nvmeof_root_ca_cert': ('nvmeof.foo', 'nvmeof-root-ca-cert', TLSObjectScope.SERVICE),
             'ingress_ssl_cert': ('ingress', 'ingress-ssl-cert', TLSObjectScope.SERVICE),
@@ -564,9 +596,8 @@ class TestCertMgr(object):
 
         keys = {
             'grafana_ssl_key': ('host1', 'fake-grafana-host1-key', TLSObjectScope.HOST),
-            'nvmeof_server_key': ('nvmeof.foo', 'nvmeof-server-key', TLSObjectScope.SERVICE),
+            'nvmeof_ssl_key': ('nvmeof.self-signed.foo', 'nvmeof-ssl-key', TLSObjectScope.SERVICE),
             'nvmeof_client_key': ('nvmeof.foo', 'nvmeof-client-key', TLSObjectScope.SERVICE),
-            'nvmeof_encryption_key': ('nvmeof.foo', 'nvmeof-encryption-key', TLSObjectScope.SERVICE),
             'mgmt_gateway_ssl_key': ('mgmt-gateway', 'mgmt-gw-key', TLSObjectScope.GLOBAL),
             'oauth2_proxy_ssl_key': ('host1', 'oauth2-proxy', TLSObjectScope.HOST),
             'ingress_ssl_key': ('ingress', 'ingress-ssl-key', TLSObjectScope.SERVICE),
@@ -821,13 +852,13 @@ class TestCertMgr(object):
 
         rgw_frontend_rgw_foo_host2_cert = 'fake-rgw-cert'
         nvmeof_client_cert = 'fake-nvmeof-client-cert'
-        nvmeof_server_cert = 'fake-nvmeof-server-cert'
+        nvmeof_ssl_cert = 'fake-nvmeof-ssl-cert'
         cephadm_module.cert_mgr.save_cert('rgw_ssl_cert', rgw_frontend_rgw_foo_host2_cert, service_name='rgw.foo', user_made=True)
-        cephadm_module.cert_mgr.save_cert('nvmeof_server_cert', nvmeof_server_cert, service_name='nvmeof.foo', user_made=True)
+        cephadm_module.cert_mgr.save_cert('nvmeof_ssl_cert', nvmeof_ssl_cert, service_name='nvmeof.self-signed.foo', user_made=False)
         cephadm_module.cert_mgr.save_cert('nvmeof_client_cert', nvmeof_client_cert, service_name='nvmeof.foo', user_made=True)
 
         assert cephadm_module.cert_mgr.get_cert('rgw_ssl_cert', service_name='rgw.foo') == rgw_frontend_rgw_foo_host2_cert
-        assert cephadm_module.cert_mgr.get_cert('nvmeof_server_cert', service_name='nvmeof.foo') == nvmeof_server_cert
+        assert cephadm_module.cert_mgr.get_cert('nvmeof_ssl_cert', service_name='nvmeof.self-signed.foo') == nvmeof_ssl_cert
         assert cephadm_module.cert_mgr.get_cert('nvmeof_client_cert', service_name='nvmeof.foo') == nvmeof_client_cert
         assert cephadm_module.cert_mgr.get_cert('grafana_ssl_cert', host='host1') is None
         assert cephadm_module.cert_mgr.get_cert('iscsi_ssl_cert', service_name='iscsi.foo') is None
@@ -841,17 +872,15 @@ class TestCertMgr(object):
             cephadm_module.cert_mgr.get_cert('rgw_ssl_cert', host='foo')
 
         grafana_host1_key = 'fake-grafana-host1-cert'
-        nvmeof_server_key = 'nvmeof-server-key'
-        nvmeof_encryption_key = 'nvmeof-encryption-key'
-        cephadm_module.cert_mgr.save_key('grafana_ssl_key', grafana_host1_key, host='host1')
+        nvmeof_client_key = 'nvmeof-client-key'
+        nvmeof_ssl_key = 'nvmeof-ssl-key'
         cephadm_module.cert_mgr.save_key('grafana_ssl_key', grafana_host1_key, host='host1')
-        cephadm_module.cert_mgr.save_key('nvmeof_server_key', nvmeof_server_key, service_name='nvmeof.foo')
-        cephadm_module.cert_mgr.save_key('nvmeof_encryption_key', nvmeof_encryption_key, service_name='nvmeof.foo')
+        cephadm_module.cert_mgr.save_key('nvmeof_client_key', nvmeof_client_key, service_name='nvmeof.foo')
+        cephadm_module.cert_mgr.save_key('nvmeof_ssl_key', nvmeof_ssl_key, service_name='nvmeof.self-signed.foo')
 
         assert cephadm_module.cert_mgr.get_key('grafana_ssl_key', host='host1') == grafana_host1_key
-        assert cephadm_module.cert_mgr.get_key('nvmeof_server_key', service_name='nvmeof.foo') == nvmeof_server_key
-        assert cephadm_module.cert_mgr.get_key('nvmeof_client_key', service_name='nvmeof.foo') is None
-        assert cephadm_module.cert_mgr.get_key('nvmeof_encryption_key', service_name='nvmeof.foo') == nvmeof_encryption_key
+        assert cephadm_module.cert_mgr.get_key('nvmeof_client_key', service_name='nvmeof.foo') == nvmeof_client_key
+        assert cephadm_module.cert_mgr.get_key('nvmeof_ssl_key', service_name='nvmeof.self-signed.foo') == nvmeof_ssl_key
 
         with pytest.raises(TLSObjectException, match='Attempted to access privkey for unknown TLS object name unknown_consumer'):
             cephadm_module.cert_mgr.get_key('unknown_consumer')
@@ -862,29 +891,25 @@ class TestCertMgr(object):
 
         # Save some certificates and ensure certificates are present
         cephadm_module.cert_mgr.save_cert('rgw_ssl_cert', 'fake-rgw-cert', service_name='rgw.foo', user_made=True)
-        cephadm_module.cert_mgr.save_cert('nvmeof_server_cert', 'fake-nvmeof-server-cert', service_name='nvmeof.foo', user_made=True)
+        cephadm_module.cert_mgr.save_cert('nvmeof_ssl_cert', 'fake-nvmeof-ssl-cert', service_name='nvmeof.self-signed.foo', user_made=False)
         assert cephadm_module.cert_mgr.get_cert('rgw_ssl_cert', service_name='rgw.foo') == 'fake-rgw-cert'
-        assert cephadm_module.cert_mgr.get_cert('nvmeof_server_cert', service_name='nvmeof.foo') == 'fake-nvmeof-server-cert'
+        assert cephadm_module.cert_mgr.get_cert('nvmeof_ssl_cert', service_name='nvmeof.self-signed.foo') == 'fake-nvmeof-ssl-cert'
 
         # Remove certificates and ensure certificates are removed
-        cephadm_module.cert_mgr.rm_cert('rgw_ssl_cert', service_name='rgw.foo')
-        cephadm_module.cert_mgr.rm_cert('nvmeof_server_cert', service_name='nvmeof.foo')
+        assert cephadm_module.cert_mgr.rm_cert('rgw_ssl_cert', service_name='rgw.foo') is True
+        assert cephadm_module.cert_mgr.rm_cert('nvmeof_ssl_cert', service_name='nvmeof.self-signed.foo') is True
         assert cephadm_module.cert_mgr.get_cert('rgw_ssl_cert', service_name='rgw.foo') is None
-        assert cephadm_module.cert_mgr.get_cert('nvmeof_server_cert', service_name='nvmeof.foo') is None
+        assert cephadm_module.cert_mgr.get_cert('nvmeof_ssl_cert', service_name='nvmeof.self-signed.foo') is None
 
     def test_tlsobject_store_rm_key(self, cephadm_module: CephadmOrchestrator):
 
         # Save some keys and ensure keys are present
         cephadm_module.cert_mgr.save_key('grafana_ssl_key', 'fake-grafana-host1-key', host='host1')
-        cephadm_module.cert_mgr.save_key('nvmeof_server_key', 'fake-nvmeof-server-key', service_name='nvmeof.foo')
         assert cephadm_module.cert_mgr.get_key('grafana_ssl_key', host='host1') == 'fake-grafana-host1-key'
-        assert cephadm_module.cert_mgr.get_key('nvmeof_server_key', service_name='nvmeof.foo') == 'fake-nvmeof-server-key'
 
         # Remove keys and ensure keys are removed
         cephadm_module.cert_mgr.rm_key('grafana_ssl_key', host='host1')
-        cephadm_module.cert_mgr.rm_key('nvmeof_server_key', service_name='nvmeof.foo')
         assert cephadm_module.cert_mgr.get_key('grafana_ssl_key', host='host1') is None
-        assert cephadm_module.cert_mgr.get_key('nvmeof_server_key', service_name='nvmeof.foo') is None
 
     @mock.patch("cephadm.module.CephadmOrchestrator.set_store")
     def test_expired_certificate_detection(self, _set_store, cephadm_module: CephadmOrchestrator):