From: Redouane Kachach Date: Tue, 10 Mar 2026 11:27:30 +0000 (+0100) Subject: mgr/cephadm: adding new UT to cover the new nvmeof API X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=83cd3c3040a60b389de08defdac52a50e6f55e09;p=ceph.git mgr/cephadm: adding new UT to cover the new nvmeof API Fixes: https://tracker.ceph.com/issues/74377 Signed-off-by: Redouane Kachach --- diff --git a/src/pybind/mgr/cephadm/tests/services/test_nvmeof.py b/src/pybind/mgr/cephadm/tests/services/test_nvmeof.py index b255f92531fd..326421a71b66 100644 --- a/src/pybind/mgr/cephadm/tests/services/test_nvmeof.py +++ b/src/pybind/mgr/cephadm/tests/services/test_nvmeof.py @@ -2,14 +2,23 @@ import json import pytest from unittest.mock import MagicMock, patch from typing import Dict, List +from ceph.utils import datetime_now -from cephadm.services.nvmeof import NvmeofService +from cephadm.services.nvmeof import NvmeofService, NVMEOF_CLIENT_CERT_LABEL from cephadm.module import CephadmOrchestrator from ceph.deployment.service_spec import NvmeofServiceSpec from cephadm.tests.fixtures import with_host, with_service, _run_cephadm, async_side_effect from orchestrator import OrchestratorError from cephadm.tlsobject_types import TLSCredentials +cephadm_root_ca = """-----BEGIN CERTIFICATE-----\nMIIE7DCCAtSgAwIBAgIUE8b2zZ64geu2ns3Zfn3/4L+Cf6MwDQYJKoZIhvcNAQEL\nBQAwFzEVMBMGA1UEAwwMY2VwaGFkbS1yb290MB4XDTI0MDYyNjE0NDA1M1oXDTM0\nMDYyNzE0NDA1M1owFzEVMBMGA1UEAwwMY2VwaGFkbS1yb290MIICIjANBgkqhkiG\n9w0BAQEFAAOCAg8AMIICCgKCAgEAsZRJsdtTr9GLG1lWFql5SGc46ldFanNJd1Gl\nqXq5vgZVKRDTmNgAb/XFuNEEmbDAXYIRZolZeYKMHfn0pouPRSel0OsC6/02ZUOW\nIuN89Wgo3IYleCFpkVIumD8URP3hwdu85plRxYZTtlruBaTRH38lssyCqxaOdEt7\nAUhvYhcMPJThB17eOSQ73mb8JEC83vB47fosI7IhZuvXvRSuZwUW30rJanWNhyZq\neS2B8qw2RSO0+77H6gA4ftBnitfsE1Y8/F9Z/f92JOZuSMQXUB07msznPbRJia3f\nueO8gOc32vxd1A1/Qzp14uX34yEGY9ko2lW226cZO29IVUtXOX+LueQttwtdlpz8\ne6Npm09pXhXAHxV/OW3M28MdXmobIqT/m9MfkeAErt5guUeC5y8doz6/3VQRjFEn\nRpN0WkblgnNAQ3DONPc+Qd9Fi/wZV2X7bXoYpNdoWDsEOiE/eLmhG1A2GqU/mneP\nzQ6u79nbdwTYpwqHpa+PvusXeLfKauzI8lLUJotdXy9EK8iHUofibB61OljYye6B\nG3b8C4QfGsw8cDb4APZd/6AZYyMx/V3cGZ+GcOV7WvsC8k7yx5Uqasm/kiGQ3EZo\nuNenNEYoGYrjb8D/8QzqNUTwlEh27/ps80tO7l2GGTvWVZL0PRZbmLDvO77amtOf\nOiRXMoUCAwEAAaMwMC4wGwYDVR0RBBQwEocQAAAAAAAAAAAAAAAAAAAAATAPBgNV\nHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4ICAQAxwzX5AhYEWhTV4VUwUj5+\nqPdl4Q2tIxRokqyE+cDxoSd+6JfGUefUbNyBxDt0HaBq8obDqqrbcytxnn7mpnDu\nhtiauY+I4Amt7hqFOiFA4cCLi2mfok6g2vL53tvhd9IrsfflAU2wy7hL76Ejm5El\nA+nXlkJwps01Whl9pBkUvIbOn3pXX50LT4hb5zN0PSu957rjd2xb4HdfuySm6nW4\n4GxtVWfmGA6zbC4XMEwvkuhZ7kD2qjkAguGDF01uMglkrkCJT3OROlNBuSTSBGqt\ntntp5VytHvb7KTF7GttM3ha8/EU2KYaHM6WImQQTrOfiImAktOk4B3lzUZX3HYIx\n+sByO4P4dCvAoGz1nlWYB2AvCOGbKf0Tgrh4t4jkiF8FHTXGdfvWmjgi1pddCNAy\nn65WOCmVmLZPERAHOk1oBwqyReSvgoCFo8FxbZcNxJdlhM0Z6hzKggm3O3Dl88Xl\n5euqJjh2STkBW8Xuowkg1TOs5XyWvKoDFAUzyzeLOL8YSG+gXV22gPTUaPSVAqdb\nwd0Fx2kjConuC5bgTzQHs8XWA930U3XWZraj21Vaa8UxlBLH4fUro8H5lMSYlZNE\nJHRNW8BkznAClaFSDG3dybLsrzrBFAu/Qb5zVkT1xyq0YkepGB7leXwq6vjWA5Pw\nmZbKSphWfh0qipoqxqhfkw==\n-----END CERTIFICATE-----\n""" + + +ceph_generated_cert = """-----BEGIN CERTIFICATE-----\nMIICxjCCAa4CEQDIZSujNBlKaLJzmvntjukjMA0GCSqGSIb3DQEBDQUAMCExDTAL\nBgNVBAoMBENlcGgxEDAOBgNVBAMMB2NlcGhhZG0wHhcNMjIwNzEzMTE0NzA3WhcN\nMzIwNzEwMTE0NzA3WjAhMQ0wCwYDVQQKDARDZXBoMRAwDgYDVQQDDAdjZXBoYWRt\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAyyMe4DMA+MeYK7BHZMHB\nq7zjliEOcNgxomjU8qbf5USF7Mqrf6+/87XWqj4pCyAW8x0WXEr6A56a+cmBVmt+\nqtWDzl020aoId6lL5EgLLn6/kMDCCJLq++Lg9cEofMSvcZh+lY2f+1p+C+00xent\nrLXvXGOilAZWaQfojT2BpRnNWWIFbpFwlcKrlg2G0cFjV5c1m6a0wpsQ9JHOieq0\nSvwCixajwq3CwAYuuiU1wjI4oJO4Io1+g8yB3nH2Mo/25SApCxMXuXh4kHLQr/T4\n4hqisvG4uJYgKMcSIrWj5o25mclByGi1UI/kZkCUES94i7Z/3ihx4Bad0AMs/9tw\nFwIDAQABMA0GCSqGSIb3DQEBDQUAA4IBAQAf+pwz7Gd7mDwU2LY0TQXsK6/8KGzh\nHuX+ErOb8h5cOAbvCnHjyJFWf6gCITG98k9nxU9NToG0WYuNm/max1y/54f0dtxZ\npUo6KSNl3w6iYCfGOeUIj8isi06xMmeTgMNzv8DYhDt+P2igN6LenqWTVztogkiV\nxQ5ZJFFLEw4sN0CXnrZX3t5ruakxLXLTLKeE0I91YJvjClSBGkVJq26wOKQNHMhx\npWxeydQ5EgPZY+Aviz5Dnxe8aB7oSSovpXByzxURSabOuCK21awW5WJCGNpmqhWK\nZzACBDEstccj57c4OGV0eayHJRsluVr2e9NHRINZA3qdB37e6gsI1xHo\n-----END CERTIFICATE-----\n""" + + +ceph_generated_key = """-----BEGIN PRIVATE KEY-----\nMIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQDLIx7gMwD4x5gr\nsEdkwcGrvOOWIQ5w2DGiaNTypt/lRIXsyqt/r7/ztdaqPikLIBbzHRZcSvoDnpr5\nyYFWa36q1YPOXTbRqgh3qUvkSAsufr+QwMIIkur74uD1wSh8xK9xmH6VjZ/7Wn4L\n7TTF6e2ste9cY6KUBlZpB+iNPYGlGc1ZYgVukXCVwquWDYbRwWNXlzWbprTCmxD0\nkc6J6rRK/AKLFqPCrcLABi66JTXCMjigk7gijX6DzIHecfYyj/blICkLExe5eHiQ\nctCv9PjiGqKy8bi4liAoxxIitaPmjbmZyUHIaLVQj+RmQJQRL3iLtn/eKHHgFp3Q\nAyz/23AXAgMBAAECggEAVoTB3Mm8azlPlaQB9GcV3tiXslSn+uYJ1duCf0sV52dV\nBzKW8s5fGiTjpiTNhGCJhchowqxoaew+o47wmGc2TvqbpeRLuecKrjScD0GkCYyQ\neM2wlshEbz4FhIZdgS6gbuh9WaM1dW/oaZoBNR5aTYo7xYTmNNeyLA/jO2zr7+4W\n5yES1lMSBXpKk7bDGKYY4bsX2b5RLr2Grh2u2bp7hoLABCEvuu8tSQdWXLEXWpXo\njwmV3hc6tabypIa0mj2Dmn2Dmt1ppSO0AZWG/WAizN3f4Z0r/u9HnbVrVmh0IEDw\n3uf2LP5o3msG9qKCbzv3lMgt9mMr70HOKnJ8ohMSKQKBgQDLkNb+0nr152HU9AeJ\nvdz8BeMxcwxCG77iwZphZ1HprmYKvvXgedqWtS6FRU+nV6UuQoPUbQxJBQzrN1Qv\nwKSlOAPCrTJgNgF/RbfxZTrIgCPuK2KM8I89VZv92TSGi362oQA4MazXC8RAWjoJ\nSu1/PHzK3aXOfVNSLrOWvIYeZQKBgQD/dgT6RUXKg0UhmXj7ExevV+c7oOJTDlMl\nvLngrmbjRgPO9VxLnZQGdyaBJeRngU/UXfNgajT/MU8B5fSKInnTMawv/tW7634B\nw3v6n5kNIMIjJmENRsXBVMllDTkT9S7ApV+VoGnXRccbTiDapBThSGd0wri/CuwK\nNWK1YFOeywKBgEDyI/XG114PBUJ43NLQVWm+wx5qszWAPqV/2S5MVXD1qC6zgCSv\nG9NLWN1CIMimCNg6dm7Wn73IM7fzvhNCJgVkWqbItTLG6DFf3/DPODLx1wTMqLOI\nqFqMLqmNm9l1Nec0dKp5BsjRQzq4zp1aX21hsfrTPmwjxeqJZdioqy2VAoGAXR5X\nCCdSHlSlUW8RE2xNOOQw7KJjfWT+WAYoN0c7R+MQplL31rRU7dpm1bLLRBN11vJ8\nMYvlT5RYuVdqQSP6BkrX+hLJNBvOLbRlL+EXOBrVyVxHCkDe+u7+DnC4epbn+N8P\nLYpwqkDMKB7diPVAizIKTBxinXjMu5fkKDs5n+sCgYBbZheYKk5M0sIxiDfZuXGB\nkf4mJdEkTI1KUGRdCwO/O7hXbroGoUVJTwqBLi1tKqLLarwCITje2T200BYOzj82\nqwRkCXGtXPKnxYEEUOiFx9OeDrzsZV00cxsEnX0Zdj+PucQ/J3Cvd0dWUspJfLHJ\n39gnaegswnz9KMQAvzKFdg==\n-----END PRIVATE KEY-----\n""" + class FakeInventory: def get_addr(self, name: str) -> str: @@ -685,3 +694,334 @@ timeout = 1.0 error_ok=True, use_current_daemon_image=False, ) + + +class TestNvmeofTLSBundle: + def _store_spec(self, cephadm_module: CephadmOrchestrator, spec: NvmeofServiceSpec) -> None: + # SpecStore in unit tests stores ServiceSpec objects directly. + cephadm_module.spec_store._specs[spec.service_name()] = spec + # Some SpecStore helpers expect spec_created to exist. + if hasattr(cephadm_module.spec_store, 'spec_created'): + cephadm_module.spec_store.spec_created[spec.service_name()] = datetime_now() + + @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_get_nvmeof_tls_bundle_ssl_disabled(self, cephadm_module: CephadmOrchestrator): + """Test that SSL disabled returns empty bundle""" + spec = NvmeofServiceSpec( + service_id='test.group', + pool='testpool', + group='group', + ssl=False, + ) + self._store_spec(cephadm_module, spec) + + nvmeof_service = NvmeofService(cephadm_module) + bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name()) + + assert bundle is not None + assert bundle.server_cert == '' + assert bundle.server_key == '' + assert bundle.client_cert == '' + assert bundle.client_key == '' + assert bundle.ca_cert == '' + + @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_get_nvmeof_tls_bundle_inline_server_only(self, cephadm_module: CephadmOrchestrator): + """Test INLINE certificate source with server creds only (no mTLS)""" + server_cert = '-----BEGIN CERTIFICATE-----\nSERVER_CERT\n-----END CERTIFICATE-----' + server_key = '-----BEGIN PRIVATE KEY-----\nSERVER_KEY\n-----END PRIVATE KEY-----' + + spec = NvmeofServiceSpec( + service_id='test.group', + pool='testpool', + group='group', + ssl=True, + certificate_source='inline', + ssl_cert=server_cert, + ssl_key=server_key, + enable_auth=False, + ) + self._store_spec(cephadm_module, spec) + + nvmeof_service = NvmeofService(cephadm_module) + bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name()) + + assert bundle is not None + assert bundle.server_cert == server_cert + assert bundle.server_key == server_key + assert bundle.client_cert == '' + assert bundle.client_key == '' + assert bundle.ca_cert == '' + + @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_get_nvmeof_tls_bundle_inline_with_mtls(self, cephadm_module: CephadmOrchestrator): + """Test INLINE certificate source with mTLS enabled""" + server_cert = '-----BEGIN CERTIFICATE-----\nSERVER_CERT\n-----END CERTIFICATE-----' + server_key = '-----BEGIN PRIVATE KEY-----\nSERVER_KEY\n-----END PRIVATE KEY-----' + client_cert = '-----BEGIN CERTIFICATE-----\nCLIENT_CERT\n-----END CERTIFICATE-----' + client_key = '-----BEGIN PRIVATE KEY-----\nCLIENT_KEY\n-----END PRIVATE KEY-----' + ca_cert = '-----BEGIN CERTIFICATE-----\nCA_CERT\n-----END CERTIFICATE-----' + + spec = NvmeofServiceSpec( + service_id='test.group', + pool='testpool', + group='group', + ssl=True, + certificate_source='inline', + ssl_cert=server_cert, + ssl_key=server_key, + enable_auth=True, + client_cert=client_cert, + client_key=client_key, + root_ca_cert=ca_cert, + ) + self._store_spec(cephadm_module, spec) + + nvmeof_service = NvmeofService(cephadm_module) + bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name()) + + assert bundle is not None + assert bundle.server_cert == server_cert + assert bundle.server_key == server_key + assert bundle.client_cert == client_cert + assert bundle.client_key == client_key + assert bundle.ca_cert == ca_cert + + @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_get_nvmeof_tls_bundle_reference_server_only(self, cephadm_module: CephadmOrchestrator): + """Test REFERENCE certificate source with server creds only""" + server_cert = '-----BEGIN CERTIFICATE-----\nREF_SERVER_CERT\n-----END CERTIFICATE-----' + server_key = '-----BEGIN PRIVATE KEY-----\nREF_SERVER_KEY\n-----END PRIVATE KEY-----' + + spec = NvmeofServiceSpec( + service_id='test.group', + pool='testpool', + group='group', + ssl=True, + certificate_source='reference', + enable_auth=False, + ) + self._store_spec(cephadm_module, spec) + + nvmeof_service = NvmeofService(cephadm_module) + + # Mock cert_mgr.get_cert and get_key for SERVICE-scoped lookups + def _get_cert(name, service_name=None, host=None): + if name == nvmeof_service.cert_name: + return server_cert + return None + + def _get_key(name, service_name=None, host=None): + if name == nvmeof_service.key_name: + return server_key + return None + + with patch.object(cephadm_module.cert_mgr, 'get_cert', side_effect=_get_cert), \ + patch.object(cephadm_module.cert_mgr, 'get_key', side_effect=_get_key): + bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name()) + + assert bundle is not None + assert bundle.server_cert == server_cert + assert bundle.server_key == server_key + assert bundle.client_cert == '' + assert bundle.client_key == '' + assert bundle.ca_cert == '' + + @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_get_nvmeof_tls_bundle_reference_with_mtls(self, cephadm_module: CephadmOrchestrator): + """Test REFERENCE certificate source with mTLS enabled""" + server_cert = '-----BEGIN CERTIFICATE-----\nREF_SERVER_CERT\n-----END CERTIFICATE-----' + server_key = '-----BEGIN PRIVATE KEY-----\nREF_SERVER_KEY\n-----END PRIVATE KEY-----' + client_cert = '-----BEGIN CERTIFICATE-----\nREF_CLIENT_CERT\n-----END CERTIFICATE-----' + client_key = '-----BEGIN PRIVATE KEY-----\nREF_CLIENT_KEY\n-----END PRIVATE KEY-----' + ca_cert = '-----BEGIN CERTIFICATE-----\nREF_CA_CERT\n-----END CERTIFICATE-----' + + spec = NvmeofServiceSpec( + service_id='test.group', + pool='testpool', + group='group', + ssl=True, + certificate_source='reference', + enable_auth=True, + ) + self._store_spec(cephadm_module, spec) + + nvmeof_service = NvmeofService(cephadm_module) + + def _get_cert(name, service_name=None, host=None): + if name == nvmeof_service.cert_name: + return server_cert + if name == nvmeof_service.client_cert_name: + return client_cert + if name == nvmeof_service.ca_cert_name: + return ca_cert + return None + + def _get_key(name, service_name=None, host=None): + if name == nvmeof_service.key_name: + return server_key + if name == nvmeof_service.client_key_name: + return client_key + return None + + with patch.object(cephadm_module.cert_mgr, 'get_cert', side_effect=_get_cert), \ + patch.object(cephadm_module.cert_mgr, 'get_key', side_effect=_get_key): + bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name()) + + assert bundle is not None + assert bundle.server_cert == server_cert + assert bundle.server_key == server_key + assert bundle.client_cert == client_cert + assert bundle.client_key == client_key + assert bundle.ca_cert == ca_cert + + @patch("cephadm.services.nvmeof.NvmeofService._pick_running_daemon_host_for_service") + @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_get_nvmeof_tls_bundle_cephadm_signed_server_only(self, _pick_host, cephadm_module: CephadmOrchestrator): + """Test CEPHADM_SIGNED certificate source without mTLS""" + _pick_host.return_value = 'test-host' + + spec = NvmeofServiceSpec( + service_id='test.group', + pool='testpool', + group='group', + ssl=True, + certificate_source='cephadm-signed', + enable_auth=False, + ) + self._store_spec(cephadm_module, spec) + + server_creds = TLSCredentials( + cert=ceph_generated_cert, + key=ceph_generated_key, + ca_cert=cephadm_root_ca, + ) + + def _get_self_signed(service_name, hostname, label=None): + assert label is None + return server_creds + + nvmeof_service = NvmeofService(cephadm_module) + with patch.object(cephadm_module.cert_mgr, 'get_self_signed_tls_credentials', side_effect=_get_self_signed): + bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name()) + + assert bundle is not None + assert bundle.server_cert == ceph_generated_cert + assert bundle.server_key == ceph_generated_key + assert bundle.ca_cert == cephadm_root_ca + assert bundle.client_cert == '' + assert bundle.client_key == '' + + @patch("cephadm.services.nvmeof.NvmeofService._pick_running_daemon_host_for_service") + @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_get_nvmeof_tls_bundle_cephadm_signed_with_mtls(self, _pick_host, cephadm_module: CephadmOrchestrator): + """Test CEPHADM_SIGNED certificate source with mTLS enabled""" + _pick_host.return_value = 'test-host' + + spec = NvmeofServiceSpec( + service_id='test.group', + pool='testpool', + group='group', + ssl=True, + certificate_source='cephadm-signed', + enable_auth=True, + ) + self._store_spec(cephadm_module, spec) + + server_creds = TLSCredentials( + cert=ceph_generated_cert, + key=ceph_generated_key, + ca_cert=cephadm_root_ca, + ) + client_creds = TLSCredentials( + cert='-----BEGIN CERTIFICATE-----\nCLIENT_CERT\n-----END CERTIFICATE-----', + key='-----BEGIN PRIVATE KEY-----\nCLIENT_KEY\n-----END PRIVATE KEY-----', + ca_cert=cephadm_root_ca, + ) + + def _get_self_signed(service_name, hostname, label=None): + if label == NVMEOF_CLIENT_CERT_LABEL: + return client_creds + assert label is None + return server_creds + + nvmeof_service = NvmeofService(cephadm_module) + with patch.object(cephadm_module.cert_mgr, 'get_self_signed_tls_credentials', side_effect=_get_self_signed): + bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name()) + + assert bundle is not None + assert bundle.server_cert == ceph_generated_cert + assert bundle.server_key == ceph_generated_key + assert bundle.client_cert == client_creds.cert + assert bundle.client_key == client_creds.key + # API returns the CA from server_creds + assert bundle.ca_cert == cephadm_root_ca + + @patch("cephadm.services.nvmeof.NvmeofService._pick_running_daemon_host_for_service") + @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_get_nvmeof_tls_bundle_cephadm_signed_no_hostname(self, _pick_host, cephadm_module: CephadmOrchestrator): + """Test CEPHADM_SIGNED returns None when no hostname can be resolved""" + _pick_host.return_value = None + + spec = NvmeofServiceSpec( + service_id='test.group', + pool='testpool', + group='group', + ssl=True, + certificate_source='cephadm-signed', + ) + self._store_spec(cephadm_module, spec) + + nvmeof_service = NvmeofService(cephadm_module) + bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name()) + assert bundle is None + + @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_get_nvmeof_tls_bundle_service_not_found(self, cephadm_module: CephadmOrchestrator): + """Test that None is returned when service doesn't exist""" + nvmeof_service = NvmeofService(cephadm_module) + bundle = nvmeof_service.get_nvmeof_tls_bundle('nvmeof.nonexistent') + assert bundle is None + + @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_get_nvmeof_tls_bundle_unknown_cert_source(self, cephadm_module: CephadmOrchestrator): + """Test that None is returned for unknown certificate_source""" + spec = NvmeofServiceSpec( + service_id='test.group', + pool='testpool', + group='group', + ssl=True, + certificate_source='unknown-source', + ) + self._store_spec(cephadm_module, spec) + + nvmeof_service = NvmeofService(cephadm_module) + bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name()) + assert bundle is None + + @patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) + def test_get_nvmeof_tls_bundle_reference_missing_certs(self, cephadm_module: CephadmOrchestrator): + """Test REFERENCE source when certs are not stored in certmgr""" + spec = NvmeofServiceSpec( + service_id='test.group', + pool='testpool', + group='group', + ssl=True, + certificate_source='reference', + enable_auth=True, + ) + self._store_spec(cephadm_module, spec) + + nvmeof_service = NvmeofService(cephadm_module) + + # Mock get_cert and get_key to return None for all lookups + with patch.object(cephadm_module.cert_mgr, 'get_cert', return_value=None), \ + patch.object(cephadm_module.cert_mgr, 'get_key', return_value=None): + bundle = nvmeof_service.get_nvmeof_tls_bundle(spec.service_name()) + + assert bundle is not None + assert bundle.server_cert == '' + assert bundle.server_key == '' + assert bundle.client_cert == '' + assert bundle.client_key == '' + assert bundle.ca_cert == ''