]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: adding new UT to cover the new nvmeof API
authorRedouane Kachach <rkachach@ibm.com>
Tue, 10 Mar 2026 11:27:30 +0000 (12:27 +0100)
committerRedouane Kachach <rkachach@ibm.com>
Tue, 14 Apr 2026 12:17:29 +0000 (14:17 +0200)
Fixes: https://tracker.ceph.com/issues/74377
Signed-off-by: Redouane Kachach <rkachach@ibm.com>
src/pybind/mgr/cephadm/tests/services/test_nvmeof.py

index b255f92531fd3014c8a44b1dda9ec81436a37fd2..326421a71b660b80cd7180de1528d9ea4bfd43ed 100644 (file)
@@ -2,14 +2,23 @@ import json
 import pytest
 from unittest.mock import MagicMock, patch
 from typing import Dict, List
+from ceph.utils import datetime_now
 
-from cephadm.services.nvmeof import NvmeofService
+from cephadm.services.nvmeof import NvmeofService, NVMEOF_CLIENT_CERT_LABEL
 from cephadm.module import CephadmOrchestrator
 from ceph.deployment.service_spec import NvmeofServiceSpec
 from cephadm.tests.fixtures import with_host, with_service, _run_cephadm, async_side_effect
 from orchestrator import OrchestratorError
 from cephadm.tlsobject_types import TLSCredentials
 
+cephadm_root_ca = """-----BEGIN CERTIFICATE-----\nMIIE7DCCAtSgAwIBAgIUE8b2zZ64geu2ns3Zfn3/4L+Cf6MwDQYJKoZIhvcNAQEL\nBQAwFzEVMBMGA1UEAwwMY2VwaGFkbS1yb290MB4XDTI0MDYyNjE0NDA1M1oXDTM0\nMDYyNzE0NDA1M1owFzEVMBMGA1UEAwwMY2VwaGFkbS1yb290MIICIjANBgkqhkiG\n9w0BAQEFAAOCAg8AMIICCgKCAgEAsZRJsdtTr9GLG1lWFql5SGc46ldFanNJd1Gl\nqXq5vgZVKRDTmNgAb/XFuNEEmbDAXYIRZolZeYKMHfn0pouPRSel0OsC6/02ZUOW\nIuN89Wgo3IYleCFpkVIumD8URP3hwdu85plRxYZTtlruBaTRH38lssyCqxaOdEt7\nAUhvYhcMPJThB17eOSQ73mb8JEC83vB47fosI7IhZuvXvRSuZwUW30rJanWNhyZq\neS2B8qw2RSO0+77H6gA4ftBnitfsE1Y8/F9Z/f92JOZuSMQXUB07msznPbRJia3f\nueO8gOc32vxd1A1/Qzp14uX34yEGY9ko2lW226cZO29IVUtXOX+LueQttwtdlpz8\ne6Npm09pXhXAHxV/OW3M28MdXmobIqT/m9MfkeAErt5guUeC5y8doz6/3VQRjFEn\nRpN0WkblgnNAQ3DONPc+Qd9Fi/wZV2X7bXoYpNdoWDsEOiE/eLmhG1A2GqU/mneP\nzQ6u79nbdwTYpwqHpa+PvusXeLfKauzI8lLUJotdXy9EK8iHUofibB61OljYye6B\nG3b8C4QfGsw8cDb4APZd/6AZYyMx/V3cGZ+GcOV7WvsC8k7yx5Uqasm/kiGQ3EZo\nuNenNEYoGYrjb8D/8QzqNUTwlEh27/ps80tO7l2GGTvWVZL0PRZbmLDvO77amtOf\nOiRXMoUCAwEAAaMwMC4wGwYDVR0RBBQwEocQAAAAAAAAAAAAAAAAAAAAATAPBgNV\nHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4ICAQAxwzX5AhYEWhTV4VUwUj5+\nqPdl4Q2tIxRokqyE+cDxoSd+6JfGUefUbNyBxDt0HaBq8obDqqrbcytxnn7mpnDu\nhtiauY+I4Amt7hqFOiFA4cCLi2mfok6g2vL53tvhd9IrsfflAU2wy7hL76Ejm5El\nA+nXlkJwps01Whl9pBkUvIbOn3pXX50LT4hb5zN0PSu957rjd2xb4HdfuySm6nW4\n4GxtVWfmGA6zbC4XMEwvkuhZ7kD2qjkAguGDF01uMglkrkCJT3OROlNBuSTSBGqt\ntntp5VytHvb7KTF7GttM3ha8/EU2KYaHM6WImQQTrOfiImAktOk4B3lzUZX3HYIx\n+sByO4P4dCvAoGz1nlWYB2AvCOGbKf0Tgrh4t4jkiF8FHTXGdfvWmjgi1pddCNAy\nn65WOCmVmLZPERAHOk1oBwqyReSvgoCFo8FxbZcNxJdlhM0Z6hzKggm3O3Dl88Xl\n5euqJjh2STkBW8Xuowkg1TOs5XyWvKoDFAUzyzeLOL8YSG+gXV22gPTUaPSVAqdb\nwd0Fx2kjConuC5bgTzQHs8XWA930U3XWZraj21Vaa8UxlBLH4fUro8H5lMSYlZNE\nJHRNW8BkznAClaFSDG3dybLsrzrBFAu/Qb5zVkT1xyq0YkepGB7leXwq6vjWA5Pw\nmZbKSphWfh0qipoqxqhfkw==\n-----END CERTIFICATE-----\n"""
+
+
+ceph_generated_cert = """-----BEGIN CERTIFICATE-----\nMIICxjCCAa4CEQDIZSujNBlKaLJzmvntjukjMA0GCSqGSIb3DQEBDQUAMCExDTAL\nBgNVBAoMBENlcGgxEDAOBgNVBAMMB2NlcGhhZG0wHhcNMjIwNzEzMTE0NzA3WhcN\nMzIwNzEwMTE0NzA3WjAhMQ0wCwYDVQQKDARDZXBoMRAwDgYDVQQDDAdjZXBoYWRt\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAyyMe4DMA+MeYK7BHZMHB\nq7zjliEOcNgxomjU8qbf5USF7Mqrf6+/87XWqj4pCyAW8x0WXEr6A56a+cmBVmt+\nqtWDzl020aoId6lL5EgLLn6/kMDCCJLq++Lg9cEofMSvcZh+lY2f+1p+C+00xent\nrLXvXGOilAZWaQfojT2BpRnNWWIFbpFwlcKrlg2G0cFjV5c1m6a0wpsQ9JHOieq0\nSvwCixajwq3CwAYuuiU1wjI4oJO4Io1+g8yB3nH2Mo/25SApCxMXuXh4kHLQr/T4\n4hqisvG4uJYgKMcSIrWj5o25mclByGi1UI/kZkCUES94i7Z/3ihx4Bad0AMs/9tw\nFwIDAQABMA0GCSqGSIb3DQEBDQUAA4IBAQAf+pwz7Gd7mDwU2LY0TQXsK6/8KGzh\nHuX+ErOb8h5cOAbvCnHjyJFWf6gCITG98k9nxU9NToG0WYuNm/max1y/54f0dtxZ\npUo6KSNl3w6iYCfGOeUIj8isi06xMmeTgMNzv8DYhDt+P2igN6LenqWTVztogkiV\nxQ5ZJFFLEw4sN0CXnrZX3t5ruakxLXLTLKeE0I91YJvjClSBGkVJq26wOKQNHMhx\npWxeydQ5EgPZY+Aviz5Dnxe8aB7oSSovpXByzxURSabOuCK21awW5WJCGNpmqhWK\nZzACBDEstccj57c4OGV0eayHJRsluVr2e9NHRINZA3qdB37e6gsI1xHo\n-----END CERTIFICATE-----\n"""
+
+
+ceph_generated_key = """-----BEGIN PRIVATE KEY-----\nMIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQDLIx7gMwD4x5gr\nsEdkwcGrvOOWIQ5w2DGiaNTypt/lRIXsyqt/r7/ztdaqPikLIBbzHRZcSvoDnpr5\nyYFWa36q1YPOXTbRqgh3qUvkSAsufr+QwMIIkur74uD1wSh8xK9xmH6VjZ/7Wn4L\n7TTF6e2ste9cY6KUBlZpB+iNPYGlGc1ZYgVukXCVwquWDYbRwWNXlzWbprTCmxD0\nkc6J6rRK/AKLFqPCrcLABi66JTXCMjigk7gijX6DzIHecfYyj/blICkLExe5eHiQ\nctCv9PjiGqKy8bi4liAoxxIitaPmjbmZyUHIaLVQj+RmQJQRL3iLtn/eKHHgFp3Q\nAyz/23AXAgMBAAECggEAVoTB3Mm8azlPlaQB9GcV3tiXslSn+uYJ1duCf0sV52dV\nBzKW8s5fGiTjpiTNhGCJhchowqxoaew+o47wmGc2TvqbpeRLuecKrjScD0GkCYyQ\neM2wlshEbz4FhIZdgS6gbuh9WaM1dW/oaZoBNR5aTYo7xYTmNNeyLA/jO2zr7+4W\n5yES1lMSBXpKk7bDGKYY4bsX2b5RLr2Grh2u2bp7hoLABCEvuu8tSQdWXLEXWpXo\njwmV3hc6tabypIa0mj2Dmn2Dmt1ppSO0AZWG/WAizN3f4Z0r/u9HnbVrVmh0IEDw\n3uf2LP5o3msG9qKCbzv3lMgt9mMr70HOKnJ8ohMSKQKBgQDLkNb+0nr152HU9AeJ\nvdz8BeMxcwxCG77iwZphZ1HprmYKvvXgedqWtS6FRU+nV6UuQoPUbQxJBQzrN1Qv\nwKSlOAPCrTJgNgF/RbfxZTrIgCPuK2KM8I89VZv92TSGi362oQA4MazXC8RAWjoJ\nSu1/PHzK3aXOfVNSLrOWvIYeZQKBgQD/dgT6RUXKg0UhmXj7ExevV+c7oOJTDlMl\nvLngrmbjRgPO9VxLnZQGdyaBJeRngU/UXfNgajT/MU8B5fSKInnTMawv/tW7634B\nw3v6n5kNIMIjJmENRsXBVMllDTkT9S7ApV+VoGnXRccbTiDapBThSGd0wri/CuwK\nNWK1YFOeywKBgEDyI/XG114PBUJ43NLQVWm+wx5qszWAPqV/2S5MVXD1qC6zgCSv\nG9NLWN1CIMimCNg6dm7Wn73IM7fzvhNCJgVkWqbItTLG6DFf3/DPODLx1wTMqLOI\nqFqMLqmNm9l1Nec0dKp5BsjRQzq4zp1aX21hsfrTPmwjxeqJZdioqy2VAoGAXR5X\nCCdSHlSlUW8RE2xNOOQw7KJjfWT+WAYoN0c7R+MQplL31rRU7dpm1bLLRBN11vJ8\nMYvlT5RYuVdqQSP6BkrX+hLJNBvOLbRlL+EXOBrVyVxHCkDe+u7+DnC4epbn+N8P\nLYpwqkDMKB7diPVAizIKTBxinXjMu5fkKDs5n+sCgYBbZheYKk5M0sIxiDfZuXGB\nkf4mJdEkTI1KUGRdCwO/O7hXbroGoUVJTwqBLi1tKqLLarwCITje2T200BYOzj82\nqwRkCXGtXPKnxYEEUOiFx9OeDrzsZV00cxsEnX0Zdj+PucQ/J3Cvd0dWUspJfLHJ\n39gnaegswnz9KMQAvzKFdg==\n-----END PRIVATE KEY-----\n"""
+
 
 class FakeInventory:
     def get_addr(self, name: str) -> str:
@@ -685,3 +694,334 @@ timeout = 1.0
                     error_ok=True,
                     use_current_daemon_image=False,
                 )
+
+
+class TestNvmeofTLSBundle:
+    def _store_spec(self, cephadm_module: CephadmOrchestrator, spec: NvmeofServiceSpec) -> None:
+        # SpecStore in unit tests stores ServiceSpec objects directly.
+        cephadm_module.spec_store._specs[spec.service_name()] = spec
+        # Some SpecStore helpers expect spec_created to exist.
+        if hasattr(cephadm_module.spec_store, 'spec_created'):
+            cephadm_module.spec_store.spec_created[spec.service_name()] = datetime_now()
+
+    @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+    def test_get_nvmeof_tls_bundle_ssl_disabled(self, cephadm_module: CephadmOrchestrator):
+        """Test that SSL disabled returns empty bundle"""
+        spec = NvmeofServiceSpec(
+            service_id='test.group',
+            pool='testpool',
+            group='group',
+            ssl=False,
+        )
+        self._store_spec(cephadm_module, spec)
+
+        nvmeof_service = NvmeofService(cephadm_module)
+        bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+
+        assert bundle is not None
+        assert bundle.server_cert == ''
+        assert bundle.server_key == ''
+        assert bundle.client_cert == ''
+        assert bundle.client_key == ''
+        assert bundle.ca_cert == ''
+
+    @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+    def test_get_nvmeof_tls_bundle_inline_server_only(self, cephadm_module: CephadmOrchestrator):
+        """Test INLINE certificate source with server creds only (no mTLS)"""
+        server_cert = '-----BEGIN CERTIFICATE-----\nSERVER_CERT\n-----END CERTIFICATE-----'
+        server_key = '-----BEGIN PRIVATE KEY-----\nSERVER_KEY\n-----END PRIVATE KEY-----'
+
+        spec = NvmeofServiceSpec(
+            service_id='test.group',
+            pool='testpool',
+            group='group',
+            ssl=True,
+            certificate_source='inline',
+            ssl_cert=server_cert,
+            ssl_key=server_key,
+            enable_auth=False,
+        )
+        self._store_spec(cephadm_module, spec)
+
+        nvmeof_service = NvmeofService(cephadm_module)
+        bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+
+        assert bundle is not None
+        assert bundle.server_cert == server_cert
+        assert bundle.server_key == server_key
+        assert bundle.client_cert == ''
+        assert bundle.client_key == ''
+        assert bundle.ca_cert == ''
+
+    @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+    def test_get_nvmeof_tls_bundle_inline_with_mtls(self, cephadm_module: CephadmOrchestrator):
+        """Test INLINE certificate source with mTLS enabled"""
+        server_cert = '-----BEGIN CERTIFICATE-----\nSERVER_CERT\n-----END CERTIFICATE-----'
+        server_key = '-----BEGIN PRIVATE KEY-----\nSERVER_KEY\n-----END PRIVATE KEY-----'
+        client_cert = '-----BEGIN CERTIFICATE-----\nCLIENT_CERT\n-----END CERTIFICATE-----'
+        client_key = '-----BEGIN PRIVATE KEY-----\nCLIENT_KEY\n-----END PRIVATE KEY-----'
+        ca_cert = '-----BEGIN CERTIFICATE-----\nCA_CERT\n-----END CERTIFICATE-----'
+
+        spec = NvmeofServiceSpec(
+            service_id='test.group',
+            pool='testpool',
+            group='group',
+            ssl=True,
+            certificate_source='inline',
+            ssl_cert=server_cert,
+            ssl_key=server_key,
+            enable_auth=True,
+            client_cert=client_cert,
+            client_key=client_key,
+            root_ca_cert=ca_cert,
+        )
+        self._store_spec(cephadm_module, spec)
+
+        nvmeof_service = NvmeofService(cephadm_module)
+        bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+
+        assert bundle is not None
+        assert bundle.server_cert == server_cert
+        assert bundle.server_key == server_key
+        assert bundle.client_cert == client_cert
+        assert bundle.client_key == client_key
+        assert bundle.ca_cert == ca_cert
+
+    @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+    def test_get_nvmeof_tls_bundle_reference_server_only(self, cephadm_module: CephadmOrchestrator):
+        """Test REFERENCE certificate source with server creds only"""
+        server_cert = '-----BEGIN CERTIFICATE-----\nREF_SERVER_CERT\n-----END CERTIFICATE-----'
+        server_key = '-----BEGIN PRIVATE KEY-----\nREF_SERVER_KEY\n-----END PRIVATE KEY-----'
+
+        spec = NvmeofServiceSpec(
+            service_id='test.group',
+            pool='testpool',
+            group='group',
+            ssl=True,
+            certificate_source='reference',
+            enable_auth=False,
+        )
+        self._store_spec(cephadm_module, spec)
+
+        nvmeof_service = NvmeofService(cephadm_module)
+
+        # Mock cert_mgr.get_cert and get_key for SERVICE-scoped lookups
+        def _get_cert(name, service_name=None, host=None):
+            if name == nvmeof_service.cert_name:
+                return server_cert
+            return None
+
+        def _get_key(name, service_name=None, host=None):
+            if name == nvmeof_service.key_name:
+                return server_key
+            return None
+
+        with patch.object(cephadm_module.cert_mgr, 'get_cert', side_effect=_get_cert), \
+             patch.object(cephadm_module.cert_mgr, 'get_key', side_effect=_get_key):
+            bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+
+        assert bundle is not None
+        assert bundle.server_cert == server_cert
+        assert bundle.server_key == server_key
+        assert bundle.client_cert == ''
+        assert bundle.client_key == ''
+        assert bundle.ca_cert == ''
+
+    @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+    def test_get_nvmeof_tls_bundle_reference_with_mtls(self, cephadm_module: CephadmOrchestrator):
+        """Test REFERENCE certificate source with mTLS enabled"""
+        server_cert = '-----BEGIN CERTIFICATE-----\nREF_SERVER_CERT\n-----END CERTIFICATE-----'
+        server_key = '-----BEGIN PRIVATE KEY-----\nREF_SERVER_KEY\n-----END PRIVATE KEY-----'
+        client_cert = '-----BEGIN CERTIFICATE-----\nREF_CLIENT_CERT\n-----END CERTIFICATE-----'
+        client_key = '-----BEGIN PRIVATE KEY-----\nREF_CLIENT_KEY\n-----END PRIVATE KEY-----'
+        ca_cert = '-----BEGIN CERTIFICATE-----\nREF_CA_CERT\n-----END CERTIFICATE-----'
+
+        spec = NvmeofServiceSpec(
+            service_id='test.group',
+            pool='testpool',
+            group='group',
+            ssl=True,
+            certificate_source='reference',
+            enable_auth=True,
+        )
+        self._store_spec(cephadm_module, spec)
+
+        nvmeof_service = NvmeofService(cephadm_module)
+
+        def _get_cert(name, service_name=None, host=None):
+            if name == nvmeof_service.cert_name:
+                return server_cert
+            if name == nvmeof_service.client_cert_name:
+                return client_cert
+            if name == nvmeof_service.ca_cert_name:
+                return ca_cert
+            return None
+
+        def _get_key(name, service_name=None, host=None):
+            if name == nvmeof_service.key_name:
+                return server_key
+            if name == nvmeof_service.client_key_name:
+                return client_key
+            return None
+
+        with patch.object(cephadm_module.cert_mgr, 'get_cert', side_effect=_get_cert), \
+             patch.object(cephadm_module.cert_mgr, 'get_key', side_effect=_get_key):
+            bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+
+        assert bundle is not None
+        assert bundle.server_cert == server_cert
+        assert bundle.server_key == server_key
+        assert bundle.client_cert == client_cert
+        assert bundle.client_key == client_key
+        assert bundle.ca_cert == ca_cert
+
+    @patch("cephadm.services.nvmeof.NvmeofService._pick_running_daemon_host_for_service")
+    @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+    def test_get_nvmeof_tls_bundle_cephadm_signed_server_only(self, _pick_host, cephadm_module: CephadmOrchestrator):
+        """Test CEPHADM_SIGNED certificate source without mTLS"""
+        _pick_host.return_value = 'test-host'
+
+        spec = NvmeofServiceSpec(
+            service_id='test.group',
+            pool='testpool',
+            group='group',
+            ssl=True,
+            certificate_source='cephadm-signed',
+            enable_auth=False,
+        )
+        self._store_spec(cephadm_module, spec)
+
+        server_creds = TLSCredentials(
+            cert=ceph_generated_cert,
+            key=ceph_generated_key,
+            ca_cert=cephadm_root_ca,
+        )
+
+        def _get_self_signed(service_name, hostname, label=None):
+            assert label is None
+            return server_creds
+
+        nvmeof_service = NvmeofService(cephadm_module)
+        with patch.object(cephadm_module.cert_mgr, 'get_self_signed_tls_credentials', side_effect=_get_self_signed):
+            bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+
+        assert bundle is not None
+        assert bundle.server_cert == ceph_generated_cert
+        assert bundle.server_key == ceph_generated_key
+        assert bundle.ca_cert == cephadm_root_ca
+        assert bundle.client_cert == ''
+        assert bundle.client_key == ''
+
+    @patch("cephadm.services.nvmeof.NvmeofService._pick_running_daemon_host_for_service")
+    @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+    def test_get_nvmeof_tls_bundle_cephadm_signed_with_mtls(self, _pick_host, cephadm_module: CephadmOrchestrator):
+        """Test CEPHADM_SIGNED certificate source with mTLS enabled"""
+        _pick_host.return_value = 'test-host'
+
+        spec = NvmeofServiceSpec(
+            service_id='test.group',
+            pool='testpool',
+            group='group',
+            ssl=True,
+            certificate_source='cephadm-signed',
+            enable_auth=True,
+        )
+        self._store_spec(cephadm_module, spec)
+
+        server_creds = TLSCredentials(
+            cert=ceph_generated_cert,
+            key=ceph_generated_key,
+            ca_cert=cephadm_root_ca,
+        )
+        client_creds = TLSCredentials(
+            cert='-----BEGIN CERTIFICATE-----\nCLIENT_CERT\n-----END CERTIFICATE-----',
+            key='-----BEGIN PRIVATE KEY-----\nCLIENT_KEY\n-----END PRIVATE KEY-----',
+            ca_cert=cephadm_root_ca,
+        )
+
+        def _get_self_signed(service_name, hostname, label=None):
+            if label == NVMEOF_CLIENT_CERT_LABEL:
+                return client_creds
+            assert label is None
+            return server_creds
+
+        nvmeof_service = NvmeofService(cephadm_module)
+        with patch.object(cephadm_module.cert_mgr, 'get_self_signed_tls_credentials', side_effect=_get_self_signed):
+            bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+
+        assert bundle is not None
+        assert bundle.server_cert == ceph_generated_cert
+        assert bundle.server_key == ceph_generated_key
+        assert bundle.client_cert == client_creds.cert
+        assert bundle.client_key == client_creds.key
+        # API returns the CA from server_creds
+        assert bundle.ca_cert == cephadm_root_ca
+
+    @patch("cephadm.services.nvmeof.NvmeofService._pick_running_daemon_host_for_service")
+    @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+    def test_get_nvmeof_tls_bundle_cephadm_signed_no_hostname(self, _pick_host, cephadm_module: CephadmOrchestrator):
+        """Test CEPHADM_SIGNED returns None when no hostname can be resolved"""
+        _pick_host.return_value = None
+
+        spec = NvmeofServiceSpec(
+            service_id='test.group',
+            pool='testpool',
+            group='group',
+            ssl=True,
+            certificate_source='cephadm-signed',
+        )
+        self._store_spec(cephadm_module, spec)
+
+        nvmeof_service = NvmeofService(cephadm_module)
+        bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+        assert bundle is None
+
+    @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+    def test_get_nvmeof_tls_bundle_service_not_found(self, cephadm_module: CephadmOrchestrator):
+        """Test that None is returned when service doesn't exist"""
+        nvmeof_service = NvmeofService(cephadm_module)
+        bundle = nvmeof_service.get_nvmeof_tls_bundle('nvmeof.nonexistent')
+        assert bundle is None
+
+    @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+    def test_get_nvmeof_tls_bundle_unknown_cert_source(self, cephadm_module: CephadmOrchestrator):
+        """Test that None is returned for unknown certificate_source"""
+        spec = NvmeofServiceSpec(
+            service_id='test.group',
+            pool='testpool',
+            group='group',
+            ssl=True,
+            certificate_source='unknown-source',
+        )
+        self._store_spec(cephadm_module, spec)
+
+        nvmeof_service = NvmeofService(cephadm_module)
+        bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+        assert bundle is None
+
+    @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+    def test_get_nvmeof_tls_bundle_reference_missing_certs(self, cephadm_module: CephadmOrchestrator):
+        """Test REFERENCE source when certs are not stored in certmgr"""
+        spec = NvmeofServiceSpec(
+            service_id='test.group',
+            pool='testpool',
+            group='group',
+            ssl=True,
+            certificate_source='reference',
+            enable_auth=True,
+        )
+        self._store_spec(cephadm_module, spec)
+
+        nvmeof_service = NvmeofService(cephadm_module)
+
+        # Mock get_cert and get_key to return None for all lookups
+        with patch.object(cephadm_module.cert_mgr, 'get_cert', return_value=None), \
+             patch.object(cephadm_module.cert_mgr, 'get_key', return_value=None):
+            bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name())
+
+        assert bundle is not None
+        assert bundle.server_cert == ''
+        assert bundle.server_key == ''
+        assert bundle.client_cert == ''
+        assert bundle.client_key == ''
+        assert bundle.ca_cert == ''