]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/cephadm: adding unit-test for nvmeof TLS bundle API
authorRedouane Kachach <rkachach@ibm.com>
Wed, 14 Jan 2026 15:16:20 +0000 (16:16 +0100)
committerRedouane Kachach <rkachach@ibm.com>
Thu, 5 Feb 2026 15:22:24 +0000 (16:22 +0100)
Signed-off-by: Redouane Kachach <rkachach@ibm.com>
src/pybind/mgr/cephadm/tests/test_services.py

index 5290328f2ebd0e309caf426964130ca209e2d5ea..4c32cc377ad6d28d3b7748d782d5b7863c6403f4 100644 (file)
@@ -8,6 +8,7 @@ import pytest
 
 from unittest.mock import Mock, MagicMock, call, patch, ANY
 
+from cephadm.services.nvmeof import NVMEOF_CLIENT_CERT_LABEL
 from cephadm.serve import CephadmServe
 from cephadm.services.service_registry import service_registry
 from cephadm.services.cephadmservice import MonService, CephadmDaemonDeploySpec
@@ -5764,3 +5765,334 @@ class TestMgmtGateway:
                     error_ok=True,
                     use_current_daemon_image=False,
                 )
+
+
+class TestNvmeofTLSBundle:
+    def _store_spec(self, cephadm_module: CephadmOrchestrator, spec: NvmeofServiceSpec) -> None:
+        # SpecStore in unit tests stores ServiceSpec objects directly.
+        cephadm_module.spec_store._specs[spec.service_name()] = spec
+        # Some SpecStore helpers expect spec_created to exist.
+        if hasattr(cephadm_module.spec_store, 'spec_created'):
+            cephadm_module.spec_store.spec_created[spec.service_name()] = datetime_now()
+
+    @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+    def test_get_nvmeof_tls_bundle_ssl_disabled(self, cephadm_module: CephadmOrchestrator):
+        """Test that SSL disabled returns empty bundle"""
+        spec = NvmeofServiceSpec(
+            service_id='test.group',
+            pool='testpool',
+            group='group',
+            ssl=False,
+        )
+        self._store_spec(cephadm_module, spec)
+
+        nvmeof_service = NvmeofService(cephadm_module)
+        bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+
+        assert bundle is not None
+        assert bundle.server_cert == ''
+        assert bundle.server_key == ''
+        assert bundle.client_cert == ''
+        assert bundle.client_key == ''
+        assert bundle.ca_cert == ''
+
+    @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+    def test_get_nvmeof_tls_bundle_inline_server_only(self, cephadm_module: CephadmOrchestrator):
+        """Test INLINE certificate source with server creds only (no mTLS)"""
+        server_cert = '-----BEGIN CERTIFICATE-----\nSERVER_CERT\n-----END CERTIFICATE-----'
+        server_key = '-----BEGIN PRIVATE KEY-----\nSERVER_KEY\n-----END PRIVATE KEY-----'
+
+        spec = NvmeofServiceSpec(
+            service_id='test.group',
+            pool='testpool',
+            group='group',
+            ssl=True,
+            certificate_source='inline',
+            ssl_cert=server_cert,
+            ssl_key=server_key,
+            enable_auth=False,
+        )
+        self._store_spec(cephadm_module, spec)
+
+        nvmeof_service = NvmeofService(cephadm_module)
+        bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+
+        assert bundle is not None
+        assert bundle.server_cert == server_cert
+        assert bundle.server_key == server_key
+        assert bundle.client_cert == ''
+        assert bundle.client_key == ''
+        assert bundle.ca_cert == ''
+
+    @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+    def test_get_nvmeof_tls_bundle_inline_with_mtls(self, cephadm_module: CephadmOrchestrator):
+        """Test INLINE certificate source with mTLS enabled"""
+        server_cert = '-----BEGIN CERTIFICATE-----\nSERVER_CERT\n-----END CERTIFICATE-----'
+        server_key = '-----BEGIN PRIVATE KEY-----\nSERVER_KEY\n-----END PRIVATE KEY-----'
+        client_cert = '-----BEGIN CERTIFICATE-----\nCLIENT_CERT\n-----END CERTIFICATE-----'
+        client_key = '-----BEGIN PRIVATE KEY-----\nCLIENT_KEY\n-----END PRIVATE KEY-----'
+        ca_cert = '-----BEGIN CERTIFICATE-----\nCA_CERT\n-----END CERTIFICATE-----'
+
+        spec = NvmeofServiceSpec(
+            service_id='test.group',
+            pool='testpool',
+            group='group',
+            ssl=True,
+            certificate_source='inline',
+            ssl_cert=server_cert,
+            ssl_key=server_key,
+            enable_auth=True,
+            client_cert=client_cert,
+            client_key=client_key,
+            root_ca_cert=ca_cert,
+        )
+        self._store_spec(cephadm_module, spec)
+
+        nvmeof_service = NvmeofService(cephadm_module)
+        bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+
+        assert bundle is not None
+        assert bundle.server_cert == server_cert
+        assert bundle.server_key == server_key
+        assert bundle.client_cert == client_cert
+        assert bundle.client_key == client_key
+        assert bundle.ca_cert == ca_cert
+
+    @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+    def test_get_nvmeof_tls_bundle_reference_server_only(self, cephadm_module: CephadmOrchestrator):
+        """Test REFERENCE certificate source with server creds only"""
+        server_cert = '-----BEGIN CERTIFICATE-----\nREF_SERVER_CERT\n-----END CERTIFICATE-----'
+        server_key = '-----BEGIN PRIVATE KEY-----\nREF_SERVER_KEY\n-----END PRIVATE KEY-----'
+
+        spec = NvmeofServiceSpec(
+            service_id='test.group',
+            pool='testpool',
+            group='group',
+            ssl=True,
+            certificate_source='reference',
+            enable_auth=False,
+        )
+        self._store_spec(cephadm_module, spec)
+
+        nvmeof_service = NvmeofService(cephadm_module)
+
+        # Mock cert_mgr.get_cert and get_key for SERVICE-scoped lookups
+        def _get_cert(name, service_name=None, host=None):
+            if name == nvmeof_service.cert_name:
+                return server_cert
+            return None
+
+        def _get_key(name, service_name=None, host=None):
+            if name == nvmeof_service.key_name:
+                return server_key
+            return None
+
+        with patch.object(cephadm_module.cert_mgr, 'get_cert', side_effect=_get_cert), \
+             patch.object(cephadm_module.cert_mgr, 'get_key', side_effect=_get_key):
+            bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+
+        assert bundle is not None
+        assert bundle.server_cert == server_cert
+        assert bundle.server_key == server_key
+        assert bundle.client_cert == ''
+        assert bundle.client_key == ''
+        assert bundle.ca_cert == ''
+
+    @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+    def test_get_nvmeof_tls_bundle_reference_with_mtls(self, cephadm_module: CephadmOrchestrator):
+        """Test REFERENCE certificate source with mTLS enabled"""
+        server_cert = '-----BEGIN CERTIFICATE-----\nREF_SERVER_CERT\n-----END CERTIFICATE-----'
+        server_key = '-----BEGIN PRIVATE KEY-----\nREF_SERVER_KEY\n-----END PRIVATE KEY-----'
+        client_cert = '-----BEGIN CERTIFICATE-----\nREF_CLIENT_CERT\n-----END CERTIFICATE-----'
+        client_key = '-----BEGIN PRIVATE KEY-----\nREF_CLIENT_KEY\n-----END PRIVATE KEY-----'
+        ca_cert = '-----BEGIN CERTIFICATE-----\nREF_CA_CERT\n-----END CERTIFICATE-----'
+
+        spec = NvmeofServiceSpec(
+            service_id='test.group',
+            pool='testpool',
+            group='group',
+            ssl=True,
+            certificate_source='reference',
+            enable_auth=True,
+        )
+        self._store_spec(cephadm_module, spec)
+
+        nvmeof_service = NvmeofService(cephadm_module)
+
+        def _get_cert(name, service_name=None, host=None):
+            if name == nvmeof_service.cert_name:
+                return server_cert
+            if name == nvmeof_service.client_cert_name:
+                return client_cert
+            if name == nvmeof_service.ca_cert_name:
+                return ca_cert
+            return None
+
+        def _get_key(name, service_name=None, host=None):
+            if name == nvmeof_service.key_name:
+                return server_key
+            if name == nvmeof_service.client_key_name:
+                return client_key
+            return None
+
+        with patch.object(cephadm_module.cert_mgr, 'get_cert', side_effect=_get_cert), \
+             patch.object(cephadm_module.cert_mgr, 'get_key', side_effect=_get_key):
+            bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+
+        assert bundle is not None
+        assert bundle.server_cert == server_cert
+        assert bundle.server_key == server_key
+        assert bundle.client_cert == client_cert
+        assert bundle.client_key == client_key
+        assert bundle.ca_cert == ca_cert
+
+    @patch("cephadm.services.nvmeof.NvmeofService._pick_running_daemon_host_for_service")
+    @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+    def test_get_nvmeof_tls_bundle_cephadm_signed_server_only(self, _pick_host, cephadm_module: CephadmOrchestrator):
+        """Test CEPHADM_SIGNED certificate source without mTLS"""
+        _pick_host.return_value = 'test-host'
+
+        spec = NvmeofServiceSpec(
+            service_id='test.group',
+            pool='testpool',
+            group='group',
+            ssl=True,
+            certificate_source='cephadm-signed',
+            enable_auth=False,
+        )
+        self._store_spec(cephadm_module, spec)
+
+        server_creds = TLSCredentials(
+            cert=ceph_generated_cert,
+            key=ceph_generated_key,
+            ca_cert=cephadm_root_ca,
+        )
+
+        def _get_self_signed(service_name, hostname, label=None):
+            assert label is None
+            return server_creds
+
+        nvmeof_service = NvmeofService(cephadm_module)
+        with patch.object(cephadm_module.cert_mgr, 'get_self_signed_tls_credentials', side_effect=_get_self_signed):
+            bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+
+        assert bundle is not None
+        assert bundle.server_cert == ceph_generated_cert
+        assert bundle.server_key == ceph_generated_key
+        assert bundle.ca_cert == cephadm_root_ca
+        assert bundle.client_cert == ''
+        assert bundle.client_key == ''
+
+    @patch("cephadm.services.nvmeof.NvmeofService._pick_running_daemon_host_for_service")
+    @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+    def test_get_nvmeof_tls_bundle_cephadm_signed_with_mtls(self, _pick_host, cephadm_module: CephadmOrchestrator):
+        """Test CEPHADM_SIGNED certificate source with mTLS enabled"""
+        _pick_host.return_value = 'test-host'
+
+        spec = NvmeofServiceSpec(
+            service_id='test.group',
+            pool='testpool',
+            group='group',
+            ssl=True,
+            certificate_source='cephadm-signed',
+            enable_auth=True,
+        )
+        self._store_spec(cephadm_module, spec)
+
+        server_creds = TLSCredentials(
+            cert=ceph_generated_cert,
+            key=ceph_generated_key,
+            ca_cert=cephadm_root_ca,
+        )
+        client_creds = TLSCredentials(
+            cert='-----BEGIN CERTIFICATE-----\nCLIENT_CERT\n-----END CERTIFICATE-----',
+            key='-----BEGIN PRIVATE KEY-----\nCLIENT_KEY\n-----END PRIVATE KEY-----',
+            ca_cert=cephadm_root_ca,
+        )
+
+        def _get_self_signed(service_name, hostname, label=None):
+            if label == NVMEOF_CLIENT_CERT_LABEL:
+                return client_creds
+            assert label is None
+            return server_creds
+
+        nvmeof_service = NvmeofService(cephadm_module)
+        with patch.object(cephadm_module.cert_mgr, 'get_self_signed_tls_credentials', side_effect=_get_self_signed):
+            bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+
+        assert bundle is not None
+        assert bundle.server_cert == ceph_generated_cert
+        assert bundle.server_key == ceph_generated_key
+        assert bundle.client_cert == client_creds.cert
+        assert bundle.client_key == client_creds.key
+        # API returns the CA from server_creds
+        assert bundle.ca_cert == cephadm_root_ca
+
+    @patch("cephadm.services.nvmeof.NvmeofService._pick_running_daemon_host_for_service")
+    @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+    def test_get_nvmeof_tls_bundle_cephadm_signed_no_hostname(self, _pick_host, cephadm_module: CephadmOrchestrator):
+        """Test CEPHADM_SIGNED returns None when no hostname can be resolved"""
+        _pick_host.return_value = None
+
+        spec = NvmeofServiceSpec(
+            service_id='test.group',
+            pool='testpool',
+            group='group',
+            ssl=True,
+            certificate_source='cephadm-signed',
+        )
+        self._store_spec(cephadm_module, spec)
+
+        nvmeof_service = NvmeofService(cephadm_module)
+        bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+        assert bundle is None
+
+    @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+    def test_get_nvmeof_tls_bundle_service_not_found(self, cephadm_module: CephadmOrchestrator):
+        """Test that None is returned when service doesn't exist"""
+        nvmeof_service = NvmeofService(cephadm_module)
+        bundle = nvmeof_service.get_nvmeof_tls_bundle('nvmeof.nonexistent')
+        assert bundle is None
+
+    @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+    def test_get_nvmeof_tls_bundle_unknown_cert_source(self, cephadm_module: CephadmOrchestrator):
+        """Test that None is returned for unknown certificate_source"""
+        spec = NvmeofServiceSpec(
+            service_id='test.group',
+            pool='testpool',
+            group='group',
+            ssl=True,
+            certificate_source='unknown-source',
+        )
+        self._store_spec(cephadm_module, spec)
+
+        nvmeof_service = NvmeofService(cephadm_module)
+        bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+        assert bundle is None
+
+    @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+    def test_get_nvmeof_tls_bundle_reference_missing_certs(self, cephadm_module: CephadmOrchestrator):
+        """Test REFERENCE source when certs are not stored in certmgr"""
+        spec = NvmeofServiceSpec(
+            service_id='test.group',
+            pool='testpool',
+            group='group',
+            ssl=True,
+            certificate_source='reference',
+            enable_auth=True,
+        )
+        self._store_spec(cephadm_module, spec)
+
+        nvmeof_service = NvmeofService(cephadm_module)
+
+        # Mock get_cert and get_key to return None for all lookups
+        with patch.object(cephadm_module.cert_mgr, 'get_cert', return_value=None), \
+             patch.object(cephadm_module.cert_mgr, 'get_key', return_value=None):
+            bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+
+        assert bundle is not None
+        assert bundle.server_cert == ''
+        assert bundle.server_key == ''
+        assert bundle.client_cert == ''
+        assert bundle.client_key == ''
+        assert bundle.ca_cert == ''